diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index f05c50a25..c0428de9a 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -59,9 +59,7 @@ func (q *fifo) PushAtOnce(c context.Context, tasks []*Task) error { q.pending.PushBack(task) } q.Unlock() - for range tasks { - go q.process() - } + go q.process() return nil } @@ -107,7 +105,6 @@ func (q *fifo) Error(c context.Context, id string, err error) error { q.removeFromPending(id) } q.Unlock() - go q.process() return nil } @@ -194,8 +191,21 @@ func (q *fifo) process() { q.resubmitExpiredBuilds() + for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { + task := pending.Value.(*Task) + delete(q.workers, worker) + q.pending.Remove(pending) + q.running[task.ID] = &entry{ + item: task, + done: make(chan bool), + deadline: time.Now().Add(q.extension), + } + worker.channel <- task + } +} + +func (q *fifo) assignToWorker() (*list.Element, *worker) { var next *list.Element -loop: for e := q.pending.Front(); e != nil; e = next { next = e.Next() task := e.Value.(*Task) @@ -204,23 +214,16 @@ loop: logrus.Debugf("queue: skipping due to unmet dependencies %v", task.ID) continue } + for w := range q.workers { if w.filter(task) { - delete(q.workers, w) - q.pending.Remove(e) - - q.running[task.ID] = &entry{ - item: task, - done: make(chan bool), - deadline: time.Now().Add(q.extension), - } - logrus.Debugf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies) - w.channel <- task - break loop + return e, w } } } + + return nil, nil } func (q *fifo) resubmitExpiredBuilds() {