From a37866179f5ccfae02e9b9729ada8f25495d9428 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Sun, 16 Jun 2019 15:26:45 +0200 Subject: [PATCH] Tasks should not run on error, unless specified --- cncd/queue/fifo.go | 27 +++++++++++++-- cncd/queue/fifo_test.go | 56 ++++++++++++++++++++++++++++++ cncd/queue/queue.go | 75 +++++++++++++++++------------------------ 3 files changed, 111 insertions(+), 47 deletions(-) diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index 275baeae0..3f21111e2 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -97,10 +97,11 @@ func (q *fifo) Done(c context.Context, id string) error { // Error signals that the item is done executing with error. func (q *fifo) Error(c context.Context, id string, err error) error { q.Lock() - state, ok := q.running[id] + taskEntry, ok := q.running[id] if ok { - state.error = err - close(state.done) + q.updateDepStatusInQueue(id, err == nil) + taskEntry.error = err + close(taskEntry.done) delete(q.running, id) } q.Unlock() @@ -247,3 +248,23 @@ func (q *fifo) depsInQueue(task *Task) bool { } return false } + +func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { + var next *list.Element + for e := q.pending.Front(); e != nil; e = next { + next = e.Next() + pending, ok := e.Value.(*Task) + for _, dep := range pending.Dependencies { + if ok && taskID == dep { + pending.DepStatus[dep] = success + } + } + } + for _, running := range q.running { + for _, dep := range running.item.Dependencies { + if taskID == dep { + running.item.DepStatus[dep] = success + } + } + } +} diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index eb4f9b174..6f3e414a5 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -2,6 +2,7 @@ package queue import ( "context" + "fmt" "sync" "testing" "time" @@ -126,6 +127,7 @@ func TestFifoDependencies(t *testing.T) { task2 := &Task{ ID: "2", Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), } q := New().(*fifo) @@ -146,3 +148,57 @@ func TestFifoDependencies(t *testing.T) { return } } + +func TestFifoErrors(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), + } + + task3 := &Task{ + ID: "3", + Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), + RunOn: []string{"success", "failure"}, + } + + q := New().(*fifo) + q.Push(noContext, task2) + q.Push(noContext, task3) + q.Push(noContext, task1) + + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + if got != task1 { + t.Errorf("expect task1 returned from queue as task2 depends on it") + return + } + + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task2 { + t.Errorf("expect task2 returned from queue") + return + } + + if got.ShouldRun() { + t.Errorf("expect task2 should not run, since task1 failed") + return + } + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task3 { + t.Errorf("expect task3 returned from queue") + return + } + + if !got.ShouldRun() { + t.Errorf("expect task3 should run, task1 failed, but task3 runs on failure too") + return + } +} diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index 2fa4626bd..cb0ae7530 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -24,8 +24,38 @@ type Task struct { // Labels represents the key-value pairs the entry is lebeled with. Labels map[string]string `json:"labels,omitempty"` - // Task IDs this task depend on + // Task IDs this task depend Dependencies []string + + // If dep finished sucessfully + DepStatus map[string]bool + + // RunOn failure or success + RunOn []string +} + +// ShouldRun tells if a task should be run or skipped, based on dependencies +func (t *Task) ShouldRun() bool { + if runsOnFailure(t.RunOn) { + return true + } + + for _, success := range t.DepStatus { + if !success { + return false + } + } + + return true +} + +func runsOnFailure(runsOn []string) bool { + for _, status := range runsOn { + if status == "failure" { + return true + } + } + return false } // InfoT provides runtime information. @@ -74,46 +104,3 @@ type Queue interface { // Info returns internal queue information. Info(c context.Context) InfoT } - -// // global instance of the queue. -// var global = New() -// -// // Set sets the global queue. -// func Set(queue Queue) { -// global = queue -// } -// -// // Push pushes an task to the tail of the global queue. -// func Push(c context.Context, task *Task) error { -// return global.Push(c, task) -// } -// -// // Poll retrieves and removes a task head of the global queue. -// func Poll(c context.Context, f Filter) (*Task, error) { -// return global.Poll(c, f) -// } -// -// // Extend extends the deadline for a task. -// func Extend(c context.Context, id string) error { -// return global.Extend(c, id) -// } -// -// // Done signals the task is complete. -// func Done(c context.Context, id string) error { -// return global.Done(c, id) -// } -// -// // Error signals the task is complete with errors. -// func Error(c context.Context, id string, err error) { -// global.Error(c, id, err) -// } -// -// // Wait waits until the task is complete. -// func Wait(c context.Context, id string) error { -// return global.Wait(c, id) -// } -// -// // Info returns internal queue information. -// func Info(c context.Context) InfoT { -// return global.Info(c) -// }