From ddd55ee39aa437f2df46a9e306f298ce5a6d6731 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 8 Oct 2024 23:06:58 +0200 Subject: [PATCH] Assign workflows to agents with the best label matches (#4201) Co-authored-by: Anbraten <6918444+anbraten@users.noreply.github.com> --- server/grpc/filter.go | 24 ++-- server/grpc/filter_test.go | 120 +++++++++++------ server/queue/fifo.go | 122 +++++++++-------- server/queue/fifo_test.go | 265 ++++++++++++++++++++++++++----------- server/queue/queue.go | 3 +- 5 files changed, 354 insertions(+), 180 deletions(-) diff --git a/server/grpc/filter.go b/server/grpc/filter.go index 9cf2d87fa..521aeb912 100644 --- a/server/grpc/filter.go +++ b/server/grpc/filter.go @@ -21,28 +21,32 @@ import ( ) func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn { - return func(task *model.Task) bool { + return func(task *model.Task) (bool, int) { + score := 0 for taskLabel, taskLabelValue := range task.Labels { // if a task label is empty it will be ignored if taskLabelValue == "" { continue } + // all task labels are required to be present for an agent to match agentLabelValue, ok := agentFilter.Labels[taskLabel] - if !ok { - return false + return false, 0 } + switch { // if agent label has a wildcard - if agentLabelValue == "*" { - continue - } - - if taskLabelValue != agentLabelValue { - return false + case agentLabelValue == "*": + score++ + // if agent label has an exact match + case agentLabelValue == taskLabelValue: + score += 10 + // agent doesn't match + default: + return false, 0 } } - return true + return true, score } } diff --git a/server/grpc/filter_test.go b/server/grpc/filter_test.go index e4c7eeae6..e074f797f 100644 --- a/server/grpc/filter_test.go +++ b/server/grpc/filter_test.go @@ -24,68 +24,110 @@ import ( ) func TestCreateFilterFunc(t *testing.T) { - t.Parallel() - tests := []struct { name string - agentLabels map[string]string - task model.Task - exp bool + agentFilter rpc.Filter + task *model.Task + wantMatched bool + wantScore int }{ { - name: "agent with missing labels", - agentLabels: map[string]string{"repo": "test/woodpecker"}, - task: model.Task{ - Labels: map[string]string{"platform": "linux/amd64", "repo": "test/woodpecker"}, + name: "Two exact matches", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, }, - exp: false, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, + }, + wantMatched: true, + wantScore: 20, }, { - name: "agent with wrong labels", - agentLabels: map[string]string{"platform": "linux/arm64"}, - task: model.Task{ - Labels: map[string]string{"platform": "linux/amd64"}, + name: "Wildcard and exact match", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "*", "platform": "linux"}, }, - exp: false, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, + }, + wantMatched: true, + wantScore: 11, }, { - name: "agent with correct labels", - agentLabels: map[string]string{"platform": "linux/amd64", "location": "europe"}, - task: model.Task{ - Labels: map[string]string{"platform": "linux/amd64", "location": "europe"}, + name: "Partial match", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, }, - exp: true, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "windows"}, + }, + wantMatched: false, + wantScore: 0, }, { - name: "agent with additional labels", - agentLabels: map[string]string{"platform": "linux/amd64", "location": "europe"}, - task: model.Task{ - Labels: map[string]string{"platform": "linux/amd64"}, + name: "No match", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "456", "platform": "linux"}, }, - exp: true, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "windows"}, + }, + wantMatched: false, + wantScore: 0, }, { - name: "agent with wildcard label", - agentLabels: map[string]string{"platform": "linux/amd64", "location": "*"}, - task: model.Task{ - Labels: map[string]string{"platform": "linux/amd64", "location": "america"}, + name: "Missing label", + agentFilter: rpc.Filter{ + Labels: map[string]string{"platform": "linux"}, }, - exp: true, + task: &model.Task{ + Labels: map[string]string{"needed": "some"}, + }, + wantMatched: false, + wantScore: 0, }, { - name: "agent with platform label and task without", - agentLabels: map[string]string{"platform": "linux/amd64"}, - task: model.Task{ - Labels: map[string]string{"platform": ""}, + name: "Empty task labels", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, }, - exp: true, + task: &model.Task{ + Labels: map[string]string{}, + }, + wantMatched: true, + wantScore: 0, + }, + { + name: "Agent with additional label", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "123", "platform": "linux", "extra": "value"}, + }, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "linux", "empty": ""}, + }, + wantMatched: true, + wantScore: 20, + }, + { + name: "Two wildcard matches", + agentFilter: rpc.Filter{ + Labels: map[string]string{"org-id": "*", "platform": "*"}, + }, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, + }, + wantMatched: true, + wantScore: 2, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fn := createFilterFunc(rpc.Filter{Labels: test.agentLabels}) - assert.EqualValues(t, test.exp, fn(&test.task)) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filterFunc := createFilterFunc(tt.agentFilter) + gotMatched, gotScore := filterFunc(tt.task) + + assert.Equal(t, tt.wantMatched, gotMatched, "Matched result") + assert.Equal(t, tt.wantScore, gotScore, "Score") }) } } diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 8cf6f4773..f924055ad 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -57,6 +57,8 @@ type fifo struct { // as the agent pull in 10 milliseconds we should also give them work asap. const processTimeInterval = 100 * time.Millisecond +var ErrWorkerKicked = fmt.Errorf("worker was kicked") + // New returns a new fifo queue. func New(ctx context.Context) Queue { q := &fifo{ @@ -91,27 +93,27 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { } // Poll retrieves and removes a task head of this queue. -func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { +func (q *fifo) Poll(c context.Context, agentID int64, filter FilterFn) (*model.Task, error) { q.Lock() ctx, stop := context.WithCancelCause(c) - w := &worker{ + _worker := &worker{ agentID: agentID, channel: make(chan *model.Task, 1), - filter: f, + filter: filter, stop: stop, } - q.workers[w] = struct{}{} + q.workers[_worker] = struct{}{} q.Unlock() for { select { case <-ctx.Done(): q.Lock() - delete(q.workers, w) + delete(q.workers, _worker) q.Unlock() return nil, ctx.Err() - case t := <-w.channel: + case t := <-_worker.channel: return t, nil } } @@ -152,22 +154,22 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e } // Evict removes a pending task from the queue. -func (q *fifo) Evict(c context.Context, id string) error { - return q.EvictAtOnce(c, []string{id}) +func (q *fifo) Evict(ctx context.Context, taskID string) error { + return q.EvictAtOnce(ctx, []string{taskID}) } // EvictAtOnce removes multiple pending tasks from the queue. -func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error { +func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error { q.Lock() defer q.Unlock() - for _, id := range ids { + for _, id := range taskIDs { var next *list.Element - for e := q.pending.Front(); e != nil; e = next { - next = e.Next() - task, ok := e.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = next { + next = element.Next() + task, ok := element.Value.(*model.Task) if ok && task.ID == id { - q.pending.Remove(e) + q.pending.Remove(element) return nil } } @@ -176,13 +178,13 @@ func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error { } // Wait waits until the item is done executing. -func (q *fifo) Wait(c context.Context, id string) error { +func (q *fifo) Wait(ctx context.Context, taskID string) error { q.Lock() - state := q.running[id] + state := q.running[taskID] q.Unlock() if state != nil { select { - case <-c.Done(): + case <-ctx.Done(): case <-state.done: return state.error } @@ -191,11 +193,11 @@ func (q *fifo) Wait(c context.Context, id string) error { } // Extend extends the task execution deadline. -func (q *fifo) Extend(_ context.Context, agentID int64, id string) error { +func (q *fifo) Extend(_ context.Context, agentID int64, taskID string) error { q.Lock() defer q.Unlock() - state, ok := q.running[id] + state, ok := q.running[taskID] if ok { if state.item.AgentID != agentID { return ErrAgentMissMatch @@ -216,12 +218,12 @@ func (q *fifo) Info(_ context.Context) InfoT { stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) - for e := q.pending.Front(); e != nil; e = e.Next() { - task, _ := e.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = element.Next() { + task, _ := element.Value.(*model.Task) stats.Pending = append(stats.Pending, task) } - for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() { - task, _ := e.Value.(*model.Task) + for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() { + task, _ := element.Value.(*model.Task) stats.WaitingOnDeps = append(stats.WaitingOnDeps, task) } for _, entry := range q.running { @@ -252,10 +254,10 @@ func (q *fifo) KickAgentWorkers(agentID int64) { q.Lock() defer q.Unlock() - for w := range q.workers { - if w.agentID == agentID { - w.stop(fmt.Errorf("worker was kicked")) - delete(q.workers, w) + for worker := range q.workers { + if worker.agentID == agentID { + worker.stop(ErrWorkerKicked) + delete(q.workers, worker) } } } @@ -307,13 +309,13 @@ func (q *fifo) filterWaiting() { q.waitingOnDeps = list.New() var filtered []*list.Element var nextPending *list.Element - for e := q.pending.Front(); e != nil; e = nextPending { - nextPending = e.Next() - task, _ := e.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = nextPending { + nextPending = element.Next() + task, _ := element.Value.(*model.Task) if q.depsInQueue(task) { log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID) q.waitingOnDeps.PushBack(task) - filtered = append(filtered, e) + filtered = append(filtered, element) } } @@ -325,37 +327,45 @@ func (q *fifo) filterWaiting() { func (q *fifo) assignToWorker() (*list.Element, *worker) { var next *list.Element - for e := q.pending.Front(); e != nil; e = next { - next = e.Next() - task, _ := e.Value.(*model.Task) + var bestWorker *worker + var bestScore int + + for element := q.pending.Front(); element != nil; element = next { + next = element.Next() + task, _ := element.Value.(*model.Task) log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) - for w := range q.workers { - if w.filter(task) { - log.Debug().Msgf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies) - return e, w + for worker := range q.workers { + matched, score := worker.filter(task) + if matched && score > bestScore { + bestWorker = worker + bestScore = score } } + if bestWorker != nil { + log.Debug().Msgf("queue: assigned task: %v with deps %v to worker with score %d", task.ID, task.Dependencies, bestScore) + return element, bestWorker + } } return nil, nil } func (q *fifo) resubmitExpiredPipelines() { - for id, state := range q.running { - if time.Now().After(state.deadline) { - q.pending.PushFront(state.item) - delete(q.running, id) - close(state.done) + for taskID, taskState := range q.running { + if time.Now().After(taskState.deadline) { + q.pending.PushFront(taskState.item) + delete(q.running, taskID) + close(taskState.done) } } } func (q *fifo) depsInQueue(task *model.Task) bool { var next *list.Element - for e := q.pending.Front(); e != nil; e = next { - next = e.Next() - possibleDep, ok := e.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = next { + next = element.Next() + possibleDep, ok := element.Value.(*model.Task) log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID) for _, dep := range task.Dependencies { if ok && possibleDep.ID == dep { @@ -376,9 +386,9 @@ func (q *fifo) depsInQueue(task *model.Task) bool { func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { var next *list.Element - for e := q.pending.Front(); e != nil; e = next { - next = e.Next() - pending, ok := e.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = next { + next = element.Next() + pending, ok := element.Value.(*model.Task) for _, dep := range pending.Dependencies { if ok && taskID == dep { pending.DepStatus[dep] = status @@ -394,9 +404,9 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { } } - for e := q.waitingOnDeps.Front(); e != nil; e = next { - next = e.Next() - waiting, ok := e.Value.(*model.Task) + for element := q.waitingOnDeps.Front(); element != nil; element = next { + next = element.Next() + waiting, ok := element.Value.(*model.Task) for _, dep := range waiting.Dependencies { if ok && taskID == dep { waiting.DepStatus[dep] = status @@ -408,12 +418,12 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { func (q *fifo) removeFromPending(taskID string) { log.Debug().Msgf("queue: trying to remove %s", taskID) var next *list.Element - for e := q.pending.Front(); e != nil; e = next { - next = e.Next() - task, _ := e.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = next { + next = element.Next() + task, _ := element.Value.(*model.Task) if task.ID == taskID { log.Debug().Msgf("queue: %s is removed from pending", taskID) - q.pending.Remove(e) + q.pending.Remove(element) return } } diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 0de92e3f2..01a790607 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -26,26 +26,27 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/server/model" ) -var noContext = context.Background() +var filterFnTrue = func(*model.Task) (bool, int) { return true, 1 } func TestFifo(t *testing.T) { want := &model.Task{ID: "1"} + ctx := context.Background() - q := New(context.Background()) - assert.NoError(t, q.Push(noContext, want)) - info := q.Info(noContext) + q := New(ctx) + assert.NoError(t, q.Push(ctx, want)) + info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, want, got) - info = q.Info(noContext) + info = q.Info(ctx) assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Running, 1, "expect task in running queue") - assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) - info = q.Info(noContext) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + info = q.Info(ctx) assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Running, 0, "expect task removed from running queue") } @@ -60,7 +61,7 @@ func TestFifoExpire(t *testing.T) { info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") - got, err := q.Poll(ctx, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, want, got) @@ -75,42 +76,45 @@ func TestFifoExpire(t *testing.T) { func TestFifoWait(t *testing.T) { want := &model.Task{ID: "1"} + ctx := context.Background() - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.Push(noContext, want)) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.Push(ctx, want)) - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, want, got) var wg sync.WaitGroup wg.Add(1) go func() { - assert.NoError(t, q.Wait(noContext, got.ID)) + assert.NoError(t, q.Wait(ctx, got.ID)) wg.Done() }() <-time.After(time.Millisecond) - assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) wg.Wait() } func TestFifoEvict(t *testing.T) { t1 := &model.Task{ID: "1"} + ctx := context.Background() - q := New(context.Background()) - assert.NoError(t, q.Push(noContext, t1)) - info := q.Info(noContext) + q := New(ctx) + assert.NoError(t, q.Push(ctx, t1)) + info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") - err := q.Evict(noContext, t1.ID) + err := q.Evict(ctx, t1.ID) assert.NoError(t, err) - info = q.Info(noContext) + info = q.Info(ctx) assert.Len(t, info.Pending, 0) - err = q.Evict(noContext, t1.ID) + err = q.Evict(ctx, t1.ID) assert.ErrorIs(t, err, ErrNotFound) } func TestFifoDependencies(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -121,21 +125,22 @@ func TestFifoDependencies(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1})) - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) - assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) - got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) } func TestFifoErrors(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -153,27 +158,28 @@ func TestFifoErrors(t *testing.T) { RunOn: []string{"success", "failure"}, } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) - assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) assert.False(t, got.ShouldRun()) - got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) assert.True(t, got.ShouldRun()) } func TestFifoErrors2(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -188,29 +194,30 @@ func TestFifoErrors2(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) for i := 0; i < 2; i++ { - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them") if got != task1 { - assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) } if got != task2 { - assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) } } - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) assert.False(t, got.ShouldRun()) } func TestFifoErrorsMultiThread(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -227,8 +234,8 @@ func TestFifoErrorsMultiThread(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) obtainedWorkCh := make(chan *model.Task) @@ -236,7 +243,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func(i int) { for { fmt.Printf("Worker %d started\n", i) - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) obtainedWorkCh <- got } @@ -255,22 +262,22 @@ func TestFifoErrorsMultiThread(t *testing.T) { case !task1Processed: assert.Equal(t, task1, got) task1Processed = true - assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, _ := q.Poll(ctx, 1, filterFnTrue) obtainedWorkCh <- got } }() case !task2Processed: assert.Equal(t, task2, got) task2Processed = true - assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, _ := q.Poll(ctx, 1, filterFnTrue) obtainedWorkCh <- got } }() @@ -281,7 +288,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { } case <-time.After(5 * time.Second): - info := q.Info(noContext) + info := q.Info(ctx) fmt.Println(info.String()) t.Errorf("test timed out") return @@ -290,6 +297,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { } func TestFifoTransitiveErrors(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -306,27 +314,28 @@ func TestFifoTransitiveErrors(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) - assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed") - assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped)) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) - got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too") } func TestFifoCancel(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -344,34 +353,35 @@ func TestFifoCancel(t *testing.T) { RunOn: []string{"success", "failure"}, } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) - assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled"))) - assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled"))) - assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled"))) + _, _ = q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, q.Error(ctx, task1.ID, fmt.Errorf("canceled"))) + assert.NoError(t, q.Error(ctx, task2.ID, fmt.Errorf("canceled"))) + assert.NoError(t, q.Error(ctx, task3.ID, fmt.Errorf("canceled"))) - info := q.Info(noContext) + info := q.Info(ctx) assert.Len(t, info.Pending, 0, "all pipelines should be canceled") } func TestFifoPause(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } - q, _ := New(context.Background()).(*fifo) + q, _ := New(ctx).(*fifo) var wg sync.WaitGroup wg.Add(1) go func() { - _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + _, _ = q.Poll(ctx, 1, filterFnTrue) wg.Done() }() q.Pause() t0 := time.Now() - assert.NoError(t, q.Push(noContext, task1)) + assert.NoError(t, q.Push(ctx, task1)) time.Sleep(20 * time.Millisecond) q.Resume() @@ -381,25 +391,27 @@ func TestFifoPause(t *testing.T) { assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume") q.Pause() - assert.NoError(t, q.Push(noContext, task1)) + assert.NoError(t, q.Push(ctx, task1)) q.Resume() - _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + _, _ = q.Poll(ctx, 1, filterFnTrue) } func TestFifoPauseResume(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } - q, _ := New(context.Background()).(*fifo) + q, _ := New(ctx).(*fifo) q.Pause() - assert.NoError(t, q.Push(noContext, task1)) + assert.NoError(t, q.Push(ctx, task1)) q.Resume() - _, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) + _, _ = q.Poll(ctx, 1, filterFnTrue) } func TestWaitingVsPending(t *testing.T) { + ctx := context.Background() task1 := &model.Task{ ID: "1", } @@ -417,20 +429,20 @@ func TestWaitingVsPending(t *testing.T) { RunOn: []string{"success", "failure"}, } - q, _ := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) + q, _ := New(ctx).(*fifo) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, _ := q.Poll(ctx, 1, filterFnTrue) - info := q.Info(noContext) + info := q.Info(ctx) assert.Equal(t, 2, info.Stats.WaitingOnDeps) - assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) + got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.EqualValues(t, task2, got) - info = q.Info(noContext) + info = q.Info(ctx) assert.Equal(t, 0, info.Stats.WaitingOnDeps) assert.Equal(t, 1, info.Stats.Pending) } @@ -504,3 +516,108 @@ func TestShouldRun(t *testing.T) { } assert.True(t, task.ShouldRun(), "on failure, tasks should run on skipped deps, something failed higher up the chain") } + +func TestFifoWithScoring(t *testing.T) { + ctx := context.Background() + q := New(ctx) + + // Create tasks with different labels + tasks := []*model.Task{ + {ID: "1", Labels: map[string]string{"org-id": "123", "platform": "linux"}}, + {ID: "2", Labels: map[string]string{"org-id": "456", "platform": "linux"}}, + {ID: "3", Labels: map[string]string{"org-id": "789", "platform": "windows"}}, + {ID: "4", Labels: map[string]string{"org-id": "123", "platform": "linux"}}, + {ID: "5", Labels: map[string]string{"org-id": "*", "platform": "linux"}}, + } + + for _, task := range tasks { + assert.NoError(t, q.Push(ctx, task)) + } + + // Create filter functions for different workers + filters := map[int]FilterFn{ + 1: func(task *model.Task) (bool, int) { + if task.Labels["org-id"] == "123" { + return true, 20 + } + if task.Labels["platform"] == "linux" { + return true, 10 + } + return true, 1 + }, + 2: func(task *model.Task) (bool, int) { + if task.Labels["org-id"] == "456" { + return true, 20 + } + if task.Labels["platform"] == "linux" { + return true, 10 + } + return true, 1 + }, + 3: func(task *model.Task) (bool, int) { + if task.Labels["platform"] == "windows" { + return true, 20 + } + return true, 1 + }, + 4: func(task *model.Task) (bool, int) { + if task.Labels["org-id"] == "123" { + return true, 20 + } + if task.Labels["platform"] == "linux" { + return true, 10 + } + return true, 1 + }, + 5: func(task *model.Task) (bool, int) { + if task.Labels["org-id"] == "*" { + return true, 15 + } + return true, 1 + }, + } + + // Start polling in separate goroutines + results := make(chan *model.Task, 5) + for i := 1; i <= 5; i++ { + go func(n int) { + task, err := q.Poll(ctx, int64(n), filters[n]) + assert.NoError(t, err) + results <- task + }(i) + } + + // Collect results + receivedTasks := make(map[string]int64) + for i := 0; i < 5; i++ { + select { + case task := <-results: + receivedTasks[task.ID] = task.AgentID + case <-time.After(time.Second): + t.Fatal("Timeout waiting for tasks") + } + } + + assert.Len(t, receivedTasks, 5, "All tasks should be assigned") + + // Define expected agent assignments + // Map structure: {taskID: []possible agentIDs} + // - taskID "1" and "4" can be assigned to agents 1 or 4 (org-id "123") + // - taskID "2" should be assigned to agent 2 (org-id "456") + // - taskID "3" should be assigned to agent 3 (platform "windows") + // - taskID "5" should be assigned to agent 5 (org-id "*") + expectedAssignments := map[string][]int64{ + "1": {1, 4}, + "2": {2}, + "3": {3}, + "4": {1, 4}, + "5": {5}, + } + + // Check if tasks are assigned as expected + for taskID, expectedAgents := range expectedAssignments { + agentID, ok := receivedTasks[taskID] + assert.True(t, ok, "Task %s should be assigned", taskID) + assert.Contains(t, expectedAgents, agentID, "Task %s should be assigned to one of the expected agents", taskID) + } +} diff --git a/server/queue/queue.go b/server/queue/queue.go index eaaeb47cf..682d0e0ac 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -67,7 +67,8 @@ func (t *InfoT) String() string { // Filter filters tasks in the queue. If the Filter returns false, // the Task is skipped and not returned to the subscriber. -type FilterFn func(*model.Task) bool +// The int return value represents the matching score (higher is better). +type FilterFn func(*model.Task) (bool, int) // Queue defines a task queue for scheduling tasks among // a pool of workers.