mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-01-09 17:15:31 +00:00
Run queue.process() in background (#4115)
This commit is contained in:
parent
4576aef483
commit
10d8a13272
2 changed files with 49 additions and 33 deletions
|
@ -43,6 +43,7 @@ type worker struct {
|
|||
type fifo struct {
|
||||
sync.Mutex
|
||||
|
||||
ctx context.Context
|
||||
workers map[*worker]struct{}
|
||||
running map[string]*entry
|
||||
pending *list.List
|
||||
|
@ -51,18 +52,23 @@ type fifo struct {
|
|||
paused bool
|
||||
}
|
||||
|
||||
// processTimeInterval is the time till the queue rearranges things,
|
||||
// as the agent pull in 10 milliseconds we should also give them work asap.
|
||||
const processTimeInterval = 100 * time.Millisecond
|
||||
|
||||
// New returns a new fifo queue.
|
||||
//
|
||||
//nolint:mnd
|
||||
func New(_ context.Context) Queue {
|
||||
return &fifo{
|
||||
func New(ctx context.Context) Queue {
|
||||
q := &fifo{
|
||||
ctx: ctx,
|
||||
workers: map[*worker]struct{}{},
|
||||
running: map[string]*entry{},
|
||||
pending: list.New(),
|
||||
waitingOnDeps: list.New(),
|
||||
extension: time.Minute * 10,
|
||||
extension: time.Minute * 10, //nolint:mnd
|
||||
paused: false,
|
||||
}
|
||||
go q.process()
|
||||
return q
|
||||
}
|
||||
|
||||
// Push pushes a task to the tail of this queue.
|
||||
|
@ -70,7 +76,6 @@ func (q *fifo) Push(_ context.Context, task *model.Task) error {
|
|||
q.Lock()
|
||||
q.pending.PushBack(task)
|
||||
q.Unlock()
|
||||
go q.process()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -81,7 +86,6 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
|
|||
q.pending.PushBack(task)
|
||||
}
|
||||
q.Unlock()
|
||||
go q.process()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -98,7 +102,6 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task,
|
|||
}
|
||||
q.workers[w] = struct{}{}
|
||||
q.Unlock()
|
||||
go q.process()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -237,7 +240,6 @@ func (q *fifo) Resume() {
|
|||
q.Lock()
|
||||
q.paused = false
|
||||
q.Unlock()
|
||||
go q.process()
|
||||
}
|
||||
|
||||
// KickAgentWorkers kicks all workers for a given agent.
|
||||
|
@ -254,28 +256,36 @@ func (q *fifo) KickAgentWorkers(agentID int64) {
|
|||
}
|
||||
|
||||
// helper function that loops through the queue and attempts to
|
||||
// match the item to a single subscriber.
|
||||
// match the item to a single subscriber until context got cancel.
|
||||
func (q *fifo) process() {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
if q.paused {
|
||||
return
|
||||
}
|
||||
|
||||
q.resubmitExpiredPipelines()
|
||||
q.filterWaiting()
|
||||
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
|
||||
task, _ := pending.Value.(*model.Task)
|
||||
task.AgentID = worker.agentID
|
||||
delete(q.workers, worker)
|
||||
q.pending.Remove(pending)
|
||||
q.running[task.ID] = &entry{
|
||||
item: task,
|
||||
done: make(chan bool),
|
||||
deadline: time.Now().Add(q.extension),
|
||||
for {
|
||||
select {
|
||||
case <-time.After(processTimeInterval):
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
}
|
||||
worker.channel <- task
|
||||
|
||||
q.Lock()
|
||||
if q.paused {
|
||||
q.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
q.resubmitExpiredPipelines()
|
||||
q.filterWaiting()
|
||||
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
|
||||
task, _ := pending.Value.(*model.Task)
|
||||
task.AgentID = worker.agentID
|
||||
delete(q.workers, worker)
|
||||
q.pending.Remove(pending)
|
||||
q.running[task.ID] = &entry{
|
||||
item: task,
|
||||
done: make(chan bool),
|
||||
deadline: time.Now().Add(q.extension),
|
||||
}
|
||||
worker.channel <- task
|
||||
}
|
||||
q.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,17 +52,23 @@ func TestFifo(t *testing.T) {
|
|||
|
||||
func TestFifoExpire(t *testing.T) {
|
||||
want := &model.Task{ID: "1"}
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
|
||||
q, _ := New(context.Background()).(*fifo)
|
||||
q, _ := New(ctx).(*fifo)
|
||||
q.extension = 0
|
||||
assert.NoError(t, q.Push(noContext, want))
|
||||
info := q.Info(noContext)
|
||||
assert.NoError(t, q.Push(ctx, want))
|
||||
info := q.Info(ctx)
|
||||
assert.Len(t, info.Pending, 1, "expect task in pending queue")
|
||||
|
||||
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||
got, err := q.Poll(ctx, 1, func(*model.Task) bool { return true })
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, want, got)
|
||||
|
||||
// cancel the context to let the process func end
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond)
|
||||
cancel(nil)
|
||||
}()
|
||||
q.process()
|
||||
assert.Len(t, info.Pending, 1, "expect task re-added to pending queue")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue