From bb941c8b83d1f00ee80fc81ead8b19b84ebc5592 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Tue, 9 Jul 2019 16:23:56 +0200 Subject: [PATCH] Differentiating between waiting on dependencies and workers --- cncd/queue/fifo.go | 73 ++++++++++++++++++++++++++++------- cncd/queue/fifo_test.go | 54 +++++++++++++++++++++----- cncd/queue/queue.go | 16 ++++---- drone-go/drone/client_test.go | 1 + drone-go/drone/types.go | 9 +++-- 5 files changed, 117 insertions(+), 36 deletions(-) diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index bc8936ba6..d76ee23b8 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -27,21 +27,23 @@ type worker struct { type fifo struct { sync.Mutex - workers map[*worker]struct{} - running map[string]*entry - pending *list.List - extension time.Duration - paused bool + workers map[*worker]struct{} + running map[string]*entry + pending *list.List + waitingOnDeps *list.List + extension time.Duration + paused bool } // New returns a new fifo queue. func New() Queue { return &fifo{ - workers: map[*worker]struct{}{}, - running: map[string]*entry{}, - pending: list.New(), - extension: time.Minute * 10, - paused: false, + workers: map[*worker]struct{}{}, + running: map[string]*entry{}, + pending: list.New(), + waitingOnDeps: list.New(), + extension: time.Minute * 10, + paused: false, } } @@ -161,11 +163,15 @@ func (q *fifo) Info(c context.Context) InfoT { stats := InfoT{} stats.Stats.Workers = len(q.workers) stats.Stats.Pending = q.pending.Len() + stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) for e := q.pending.Front(); e != nil; e = e.Next() { stats.Pending = append(stats.Pending, e.Value.(*Task)) } + for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() { + stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*Task)) + } for _, entry := range q.running { stats.Running = append(stats.Running, entry.item) } @@ -210,7 +216,7 @@ func (q *fifo) process() { defer q.Unlock() q.resubmitExpiredBuilds() - + q.filterWaiting() for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { task := pending.Value.(*Task) delete(q.workers, worker) @@ -224,16 +230,41 @@ func (q *fifo) process() { } } +func (q *fifo) filterWaiting() { + // resubmits all waiting tasks to pending, deps may have cleared + var nextWaiting *list.Element + for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting { + nextWaiting = e.Next() + task := e.Value.(*Task) + q.pending.PushBack(task) + } + + // rebuild waitingDeps + q.waitingOnDeps = list.New() + filtered := []*list.Element{} + var nextPending *list.Element + for e := q.pending.Front(); e != nil; e = nextPending { + nextPending = e.Next() + task := e.Value.(*Task) + if q.depsInQueue(task) { + logrus.Debugf("queue: waiting due to unmet dependencies %v", task.ID) + q.waitingOnDeps.PushBack(task) + filtered = append(filtered, e) + } + } + + // filter waiting tasks + for _, f := range filtered { + q.pending.Remove(f) + } +} + func (q *fifo) assignToWorker() (*list.Element, *worker) { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() task := e.Value.(*Task) logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) - if q.depsInQueue(task) { - logrus.Debugf("queue: skipping due to unmet dependencies %v", task.ID) - continue - } for w := range q.workers { if w.filter(task) { @@ -290,6 +321,7 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { } } } + for _, running := range q.running { for _, dep := range running.item.Dependencies { if taskID == dep { @@ -297,6 +329,17 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { } } } + + var n *list.Element + for e := q.waitingOnDeps.Front(); e != nil; e = n { + next = e.Next() + waiting, ok := e.Value.(*Task) + for _, dep := range waiting.Dependencies { + if ok && taskID == dep { + waiting.DepStatus[dep] = success + } + } + } } func (q *fifo) removeFromPending(taskID string) { diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 2252bbb7f..4e308c1ff 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -131,8 +131,7 @@ func TestFifoDependencies(t *testing.T) { } q := New().(*fifo) - q.Push(noContext, task2) - q.Push(noContext, task1) + q.PushAtOnce(noContext, []*Task{task2, task1}) got, _ := q.Poll(noContext, func(*Task) bool { return true }) if got != task1 { @@ -168,9 +167,7 @@ func TestFifoErrors(t *testing.T) { } q := New().(*fifo) - q.Push(noContext, task2) - q.Push(noContext, task3) - q.Push(noContext, task1) + q.PushAtOnce(noContext, []*Task{task2, task3, task1}) got, _ := q.Poll(noContext, func(*Task) bool { return true }) if got != task1 { @@ -222,9 +219,7 @@ func TestFifoCancel(t *testing.T) { } q := New().(*fifo) - q.Push(noContext, task2) - q.Push(noContext, task3) - q.Push(noContext, task1) + q.PushAtOnce(noContext, []*Task{task2, task3, task1}) _, _ = q.Poll(noContext, func(*Task) bool { return true }) q.Error(noContext, task1.ID, fmt.Errorf("cancelled")) @@ -251,7 +246,6 @@ func TestFifoPause(t *testing.T) { wg.Done() }() - q.Pause() t0 := time.Now() q.Push(noContext, task1) @@ -261,7 +255,7 @@ func TestFifoPause(t *testing.T) { wg.Wait() t1 := time.Now() - if t1.Sub(t0) < 20 * time.Millisecond { + if t1.Sub(t0) < 20*time.Millisecond { t.Errorf("Should have waited til resume") } @@ -284,6 +278,46 @@ func TestFifoPauseResume(t *testing.T) { _, _ = q.Poll(noContext, func(*Task) bool { return true }) } +func TestWaitingVsPending(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), + } + + task3 := &Task{ + ID: "3", + Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), + RunOn: []string{"success", "failure"}, + } + + q := New().(*fifo) + q.PushAtOnce(noContext, []*Task{task2, task3, task1}) + + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + + info := q.Info(noContext) + if info.Stats.WaitingOnDeps != 2 { + t.Errorf("2 should wait on deps") + } + + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + + info = q.Info(noContext) + if info.Stats.WaitingOnDeps != 0 { + t.Errorf("0 should wait on deps") + } + if info.Stats.Pending != 1 { + t.Errorf("1 should wait for worker") + } +} + func TestShouldRun(t *testing.T) { task := &Task{ ID: "2", diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index fc1641c36..3154d3d93 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -85,13 +85,15 @@ func runsOnSuccess(runsOn []string) bool { // InfoT provides runtime information. type InfoT struct { - Pending []*Task `json:"pending"` - Running []*Task `json:"running"` - Stats struct { - Workers int `json:"worker_count"` - Pending int `json:"pending_count"` - Running int `json:"running_count"` - Complete int `json:"completed_count"` + Pending []*Task `json:"pending"` + WaitingOnDeps []*Task `json:"waiting_on_deps"` + Running []*Task `json:"running"` + Stats struct { + Workers int `json:"worker_count"` + Pending int `json:"pending_count"` + WaitingOnDeps int `json:"waiting_on_deps_count"` + Running int `json:"running_count"` + Complete int `json:"completed_count"` } `json:"stats"` Paused bool } diff --git a/drone-go/drone/client_test.go b/drone-go/drone/client_test.go index 0e3b6d640..4d355e11b 100644 --- a/drone-go/drone/client_test.go +++ b/drone-go/drone/client_test.go @@ -27,6 +27,7 @@ func Test_QueueInfo(t *testing.T) { "stats": { "worker_count": 3, "pending_count": 0, + "waiting_on_deps_count": 0, "running_count": 1, "completed_count": 0 }, diff --git a/drone-go/drone/types.go b/drone-go/drone/types.go index 7919a8748..305a52968 100644 --- a/drone-go/drone/types.go +++ b/drone-go/drone/types.go @@ -150,10 +150,11 @@ type ( // Info provides queue stats. Info struct { Stats struct { - Workers int `json:"worker_count"` - Pending int `json:"pending_count"` - Running int `json:"running_count"` - Complete int `json:"completed_count"` + Workers int `json:"worker_count"` + Pending int `json:"pending_count"` + WaitingOnDeps int `json:"waiting_on_deps_count"` + Running int `json:"running_count"` + Complete int `json:"completed_count"` } `json:"stats"` Paused bool `json:"paused,omitempty"` }