diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index ced613d64..275baeae0 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -7,6 +7,8 @@ import ( "runtime" "sync" "time" + + "github.com/Sirupsen/logrus" ) type entry struct { @@ -50,6 +52,19 @@ func (q *fifo) Push(c context.Context, task *Task) error { return nil } +// Push pushes an item to the tail of this queue. +func (q *fifo) PushAtOnce(c context.Context, tasks []*Task) error { + q.Lock() + for _, task := range tasks { + q.pending.PushBack(task) + } + q.Unlock() + for range tasks { + go q.process() + } + return nil +} + // Poll retrieves and removes the head of this queue. func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) { q.Lock() @@ -187,21 +202,48 @@ func (q *fifo) process() { loop: for e := q.pending.Front(); e != nil; e = next { next = e.Next() - item := e.Value.(*Task) + task := e.Value.(*Task) + logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) + if q.depsInQueue(task) { + continue + } for w := range q.workers { - if w.filter(item) { + if w.filter(task) { delete(q.workers, w) q.pending.Remove(e) - q.running[item.ID] = &entry{ - item: item, + q.running[task.ID] = &entry{ + item: task, done: make(chan bool), deadline: time.Now().Add(q.extension), } - w.channel <- item + logrus.Debugf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies) + w.channel <- task break loop } } } } + +func (q *fifo) depsInQueue(task *Task) bool { + var next *list.Element + for e := q.pending.Front(); e != nil; e = next { + next = e.Next() + possibleDep, ok := e.Value.(*Task) + logrus.Debugf("queue: in queue right now: %v", possibleDep.ID) + for _, dep := range task.Dependencies { + if ok && possibleDep.ID == dep { + return true + } + } + } + for possibleDepID := range q.running { + for _, dep := range task.Dependencies { + if possibleDepID == dep { + return true + } + } + } + return false +} diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 5123671bd..8d73a0f56 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -117,3 +117,30 @@ func TestFifoEvict(t *testing.T) { t.Errorf("expect not found error when evicting item not in queue, got %s", err) } } + +func TestFifoDependencies(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + } + + q := New().(*fifo) + q.Push(noContext, task2) + q.Push(noContext, task1) + + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + if got != task1 { + t.Errorf("expect task1 returned from queue as task2 depends on it") + return + } + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task2 { + t.Errorf("expect task2 returned from queue") + return + } +} diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index fa25a798a..2fa4626bd 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -23,6 +23,9 @@ type Task struct { // Labels represents the key-value pairs the entry is lebeled with. Labels map[string]string `json:"labels,omitempty"` + + // Task IDs this task depend on + Dependencies []string } // InfoT provides runtime information. @@ -44,9 +47,12 @@ type Filter func(*Task) bool // Queue defines a task queue for scheduling tasks among // a pool of workers. type Queue interface { - // Push pushes an task to the tail of this queue. + // Push pushes a task to the tail of this queue. Push(c context.Context, task *Task) error + // Push pushes a task to the tail of this queue. + PushAtOnce(c context.Context, tasks []*Task) error + // Poll retrieves and removes a task head of this queue. Poll(c context.Context, f Filter) (*Task, error) diff --git a/model/queue.go b/model/queue.go index 040728ae3..433fdcc00 100644 --- a/model/queue.go +++ b/model/queue.go @@ -54,7 +54,7 @@ type persistentQueue struct { store TaskStore } -// Push pushes an task to the tail of this queue. +// Push pushes a task to the tail of this queue. func (q *persistentQueue) Push(c context.Context, task *queue.Task) error { q.store.TaskInsert(&Task{ ID: task.ID, @@ -68,6 +68,24 @@ func (q *persistentQueue) Push(c context.Context, task *queue.Task) error { return err } +// Push pushes multiple tasks to the tail of this queue. +func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*queue.Task) error { + for _, task := range tasks { + q.store.TaskInsert(&Task{ + ID: task.ID, + Data: task.Data, + Labels: task.Labels, + }) + } + err := q.Queue.PushAtOnce(c, tasks) + if err != nil { + for _, task := range tasks { + q.store.TaskDelete(task.ID) + } + } + return err +} + // Poll retrieves and removes a task head of this queue. func (q *persistentQueue) Poll(c context.Context, f queue.Filter) (*queue.Task, error) { task, err := q.Queue.Poll(c, f) diff --git a/server/build.go b/server/build.go index f45f41740..943539308 100644 --- a/server/build.go +++ b/server/build.go @@ -315,9 +315,9 @@ func PostApproval(c *gin.Context) { } }() - var yamls []string + var yamls []*remote.FileMeta for _, y := range configs { - yamls = append(yamls, string(y.Data)) + yamls = append(yamls, &remote.FileMeta{Data: []byte(y.Data), Name: y.Name}) } b := procBuilder{ @@ -478,6 +478,13 @@ func PostBuild(c *gin.Context) { return } + err = persistBuildConfigs(configs, build.ID) + if err != nil { + logrus.Errorf("failure to persist build config for %s. %s", repo.FullName, err) + c.AbortWithError(500, err) + return + } + // Read query string parameters into buildParams, exclude reserved params var buildParams = map[string]string{} for key, val := range c.Request.URL.Query() { @@ -508,9 +515,9 @@ func PostBuild(c *gin.Context) { } } - var yamls []string + var yamls []*remote.FileMeta for _, y := range configs { - yamls = append(yamls, string(y.Data)) + yamls = append(yamls, &remote.FileMeta{Data: []byte(y.Data), Name: y.Name}) } b := procBuilder{ @@ -589,6 +596,20 @@ func DeleteBuildLogs(c *gin.Context) { c.String(204, "") } +func persistBuildConfigs(configs []*model.Config, buildID int64) error { + for _, conf := range configs { + buildConfig := &model.BuildConfig{ + ConfigID: conf.ID, + BuildID: buildID, + } + err := Config.Storage.Config.BuildConfigCreate(buildConfig) + if err != nil { + return err + } + } + return nil +} + var deleteStr = `[ { "proc": %q, diff --git a/server/hook.go b/server/hook.go index 5b64ce844..833408e94 100644 --- a/server/hook.go +++ b/server/hook.go @@ -180,8 +180,7 @@ func PostHook(c *gin.Context) { // persist the build config for historical correctness, restarts, etc for _, remoteYamlConfig := range remoteYamlConfigs { - conf, err := findOrPersistPipelineConfig(build, remoteYamlConfig.Data) - fmt.Println(conf) + _, err := findOrPersistPipelineConfig(build, remoteYamlConfig) if err != nil { logrus.Errorf("failure to find or persist build config for %s. %s", repo.FullName, err) c.AbortWithError(500, err) @@ -230,11 +229,6 @@ func PostHook(c *gin.Context) { } }() - var yamls []string - for _, y := range remoteYamlConfigs { - yamls = append(yamls, string(y.Data)) - } - b := procBuilder{ Repo: repo, Curr: build, @@ -244,7 +238,7 @@ func PostHook(c *gin.Context) { Regs: regs, Envs: envs, Link: httputil.GetURL(c.Request), - Yamls: yamls, + Yamls: remoteYamlConfigs, } buildItems, err := b.Build() if err != nil { @@ -265,14 +259,15 @@ func PostHook(c *gin.Context) { queueBuild(build, repo, buildItems) } -func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig []byte) (*model.Config, error) { - sha := shasum(remoteYamlConfig) +func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig *remote.FileMeta) (*model.Config, error) { + sha := shasum(remoteYamlConfig.Data) conf, err := Config.Storage.Config.ConfigFindIdentical(build.RepoID, sha) if err != nil { conf = &model.Config{ RepoID: build.RepoID, - Data: string(remoteYamlConfig), + Data: string(remoteYamlConfig.Data), Hash: sha, + Name: sanitizePath(remoteYamlConfig.Name), } err = Config.Storage.Config.ConfigCreate(conf) if err != nil { @@ -296,6 +291,7 @@ func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig []byte) (* return conf, nil } +// publishes message to UI clients func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) { message := pubsub.Message{ Labels: map[string]string{ @@ -314,6 +310,7 @@ func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) { } func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) { + var tasks []*queue.Task for _, item := range buildItems { task := new(queue.Task) task.ID = fmt.Sprint(item.Proc.ID) @@ -323,6 +320,7 @@ func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) { } task.Labels["platform"] = item.Platform task.Labels["repo"] = repo.FullName + task.Dependencies = taskIds(item.DependsOn, buildItems) task.Data, _ = json.Marshal(rpc.Pipeline{ ID: fmt.Sprint(item.Proc.ID), @@ -331,8 +329,21 @@ func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) { }) Config.Services.Logs.Open(context.Background(), task.ID) - Config.Services.Queue.Push(context.Background(), task) + tasks = append(tasks, task) } + Config.Services.Queue.PushAtOnce(context.Background(), tasks) +} + +func taskIds(dependsOn []string, buildItems []*buildItem) []string { + taskIds := []string{} + for _, dep := range dependsOn { + for _, buildItem := range buildItems { + if buildItem.Proc.Name == dep { + taskIds = append(taskIds, fmt.Sprint(buildItem.Proc.ID)) + } + } + } + return taskIds } func shasum(raw []byte) string { diff --git a/server/procBuilder.go b/server/procBuilder.go index 7cd70e13a..230656dd6 100644 --- a/server/procBuilder.go +++ b/server/procBuilder.go @@ -28,6 +28,7 @@ import ( "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/frontend/yaml/linter" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/frontend/yaml/matrix" "github.com/laszlocph/drone-oss-08/model" + "github.com/laszlocph/drone-oss-08/remote" ) // Takes the hook data and the yaml and returns in internal data model @@ -39,15 +40,16 @@ type procBuilder struct { Secs []*model.Secret Regs []*model.Registry Link string - Yamls []string + Yamls []*remote.FileMeta Envs map[string]string } type buildItem struct { - Proc *model.Proc - Platform string - Labels map[string]string - Config *backend.Config + Proc *model.Proc + Platform string + Labels map[string]string + DependsOn []string + Config *backend.Config } func (b *procBuilder) Build() ([]*buildItem, error) { @@ -55,7 +57,7 @@ func (b *procBuilder) Build() ([]*buildItem, error) { for j, y := range b.Yamls { // matrix axes - axes, err := matrix.ParseString(y) + axes, err := matrix.ParseString(string(y.Data)) if err != nil { return nil, err } @@ -70,6 +72,7 @@ func (b *procBuilder) Build() ([]*buildItem, error) { PGID: j + i + 1, State: model.StatusPending, Environ: axis, + Name: sanitizePath(y.Name), } b.Curr.Procs = append(b.Curr.Procs, proc) @@ -77,13 +80,13 @@ func (b *procBuilder) Build() ([]*buildItem, error) { environ := b.environmentVariables(metadata, axis) // substitute vars - y, err := b.envsubst_(y, environ) + substituted, err := b.envsubst_(string(y.Data), environ) if err != nil { return nil, err } // parse yaml pipeline - parsed, err := yaml.ParseString(y) + parsed, err := yaml.ParseString(substituted) if err != nil { return nil, err } @@ -101,10 +104,11 @@ func (b *procBuilder) Build() ([]*buildItem, error) { ir := b.toInternalRepresentation(parsed, environ, metadata, proc.ID) item := &buildItem{ - Proc: proc, - Config: ir, - Labels: parsed.Labels, - Platform: metadata.Sys.Arch, + Proc: proc, + Config: ir, + Labels: parsed.Labels, + DependsOn: parsed.DependsOn, + Platform: metadata.Sys.Arch, } if item.Labels == nil { item.Labels = map[string]string{} @@ -289,3 +293,10 @@ func metadataFromStruct(repo *model.Repo, build, last *model.Build, proc *model. }, } } + +func sanitizePath(path string) string { + path = strings.TrimSuffix(path, ".yml") + path = strings.TrimPrefix(path, ".drone/") + path = strings.TrimPrefix(path, ".") + return path +} diff --git a/server/procBuilder_test.go b/server/procBuilder_test.go index aaf002821..177a960c5 100644 --- a/server/procBuilder_test.go +++ b/server/procBuilder_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/laszlocph/drone-oss-08/model" + "github.com/laszlocph/drone-oss-08/remote" ) func TestMultilineEnvsubst(t *testing.T) { @@ -33,17 +34,20 @@ bbb`, Secs: []*model.Secret{}, Regs: []*model.Registry{}, Link: "", - Yamls: []string{`pipeline: + Yamls: []*remote.FileMeta{ + &remote.FileMeta{Data: []byte(` +pipeline: xxx: image: scratch yyy: ${DRONE_COMMIT_MESSAGE} -`, `pipeline: +`)}, + &remote.FileMeta{Data: []byte(` +pipeline: build: image: scratch yyy: ${DRONE_COMMIT_MESSAGE} -`, - }, - } +`)}, + }} if buildItems, err := b.Build(); err != nil { t.Fatal(err) @@ -61,15 +65,19 @@ func TestMultiPipeline(t *testing.T) { Secs: []*model.Secret{}, Regs: []*model.Registry{}, Link: "", - Yamls: []string{`pipeline: - lint: + Yamls: []*remote.FileMeta{ + &remote.FileMeta{Data: []byte(` +pipeline: + xxx: image: scratch yyy: ${DRONE_COMMIT_MESSAGE} -`, `pipeline: - test: +`)}, + &remote.FileMeta{Data: []byte(` +pipeline: + build: image: scratch yyy: ${DRONE_COMMIT_MESSAGE} -`, +`)}, }, } @@ -81,3 +89,38 @@ func TestMultiPipeline(t *testing.T) { t.Fatal("Should have generated 2 buildItems") } } + +func TestDependsOn(t *testing.T) { + b := procBuilder{ + Repo: &model.Repo{}, + Curr: &model.Build{}, + Last: &model.Build{}, + Netrc: &model.Netrc{}, + Secs: []*model.Secret{}, + Regs: []*model.Registry{}, + Link: "", + Yamls: []*remote.FileMeta{ + &remote.FileMeta{Data: []byte(` +pipeline: + deploy: + image: scratch + +depends_on: + - lint + - test + - build +`)}, + }, + } + + buildItems, err := b.Build() + if err != nil { + t.Fatal(err) + } + if len(buildItems[0].DependsOn) != 3 { + t.Fatal("Should have 3 dependencies") + } + if buildItems[0].DependsOn[1] != "test" { + t.Fatal("Should depend on test") + } +}