Assign multiple pending tasks in one go

This commit is contained in:
Laszlo Fogas 2019-06-24 22:35:50 +02:00
parent e239c2fe74
commit 844c371f00

View file

@ -59,9 +59,7 @@ func (q *fifo) PushAtOnce(c context.Context, tasks []*Task) error {
q.pending.PushBack(task)
}
q.Unlock()
for range tasks {
go q.process()
}
go q.process()
return nil
}
@ -107,7 +105,6 @@ func (q *fifo) Error(c context.Context, id string, err error) error {
q.removeFromPending(id)
}
q.Unlock()
go q.process()
return nil
}
@ -194,8 +191,21 @@ func (q *fifo) process() {
q.resubmitExpiredBuilds()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*Task)
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}
worker.channel <- task
}
}
func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
loop:
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
@ -204,23 +214,16 @@ loop:
logrus.Debugf("queue: skipping due to unmet dependencies %v", task.ID)
continue
}
for w := range q.workers {
if w.filter(task) {
delete(q.workers, w)
q.pending.Remove(e)
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}
logrus.Debugf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies)
w.channel <- task
break loop
return e, w
}
}
}
return nil, nil
}
func (q *fifo) resubmitExpiredBuilds() {