diff --git a/server/queue/fifo.go b/server/queue/fifo.go index d6fe664c9..dfe35cc34 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -43,6 +43,7 @@ type worker struct { type fifo struct { sync.Mutex + ctx context.Context workers map[*worker]struct{} running map[string]*entry pending *list.List @@ -51,18 +52,23 @@ type fifo struct { paused bool } +// processTimeInterval is the time till the queue rearranges things, +// as the agent pull in 10 milliseconds we should also give them work asap. +const processTimeInterval = 100 * time.Millisecond + // New returns a new fifo queue. -// -//nolint:mnd -func New(_ context.Context) Queue { - return &fifo{ +func New(ctx context.Context) Queue { + q := &fifo{ + ctx: ctx, workers: map[*worker]struct{}{}, running: map[string]*entry{}, pending: list.New(), waitingOnDeps: list.New(), - extension: time.Minute * 10, + extension: time.Minute * 10, //nolint:mnd paused: false, } + go q.process() + return q } // Push pushes a task to the tail of this queue. @@ -70,7 +76,6 @@ func (q *fifo) Push(_ context.Context, task *model.Task) error { q.Lock() q.pending.PushBack(task) q.Unlock() - go q.process() return nil } @@ -81,7 +86,6 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { q.pending.PushBack(task) } q.Unlock() - go q.process() return nil } @@ -98,7 +102,6 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, } q.workers[w] = struct{}{} q.Unlock() - go q.process() for { select { @@ -237,7 +240,6 @@ func (q *fifo) Resume() { q.Lock() q.paused = false q.Unlock() - go q.process() } // KickAgentWorkers kicks all workers for a given agent. @@ -254,28 +256,36 @@ func (q *fifo) KickAgentWorkers(agentID int64) { } // helper function that loops through the queue and attempts to -// match the item to a single subscriber. +// match the item to a single subscriber until context got cancel. func (q *fifo) process() { - q.Lock() - defer q.Unlock() - - if q.paused { - return - } - - q.resubmitExpiredPipelines() - q.filterWaiting() - for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { - task, _ := pending.Value.(*model.Task) - task.AgentID = worker.agentID - delete(q.workers, worker) - q.pending.Remove(pending) - q.running[task.ID] = &entry{ - item: task, - done: make(chan bool), - deadline: time.Now().Add(q.extension), + for { + select { + case <-time.After(processTimeInterval): + case <-q.ctx.Done(): + return } - worker.channel <- task + + q.Lock() + if q.paused { + q.Unlock() + continue + } + + q.resubmitExpiredPipelines() + q.filterWaiting() + for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { + task, _ := pending.Value.(*model.Task) + task.AgentID = worker.agentID + delete(q.workers, worker) + q.pending.Remove(pending) + q.running[task.ID] = &entry{ + item: task, + done: make(chan bool), + deadline: time.Now().Add(q.extension), + } + worker.channel <- task + } + q.Unlock() } } diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index d1d3a2ebc..0de92e3f2 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -52,17 +52,23 @@ func TestFifo(t *testing.T) { func TestFifoExpire(t *testing.T) { want := &model.Task{ID: "1"} + ctx, cancel := context.WithCancelCause(context.Background()) - q, _ := New(context.Background()).(*fifo) + q, _ := New(ctx).(*fifo) q.extension = 0 - assert.NoError(t, q.Push(noContext, want)) - info := q.Info(noContext) + assert.NoError(t, q.Push(ctx, want)) + info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, func(*model.Task) bool { return true }) assert.NoError(t, err) assert.Equal(t, want, got) + // cancel the context to let the process func end + go func() { + time.Sleep(time.Millisecond) + cancel(nil) + }() q.process() assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") }