Assign workflows to agents with the best label matches (#4201)

Co-authored-by: Anbraten <6918444+anbraten@users.noreply.github.com>
This commit is contained in:
6543 2024-10-08 23:06:58 +02:00 committed by GitHub
parent bf70dee670
commit ddd55ee39a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 354 additions and 180 deletions

View file

@ -21,28 +21,32 @@ import (
) )
func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn { func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn {
return func(task *model.Task) bool { return func(task *model.Task) (bool, int) {
score := 0
for taskLabel, taskLabelValue := range task.Labels { for taskLabel, taskLabelValue := range task.Labels {
// if a task label is empty it will be ignored // if a task label is empty it will be ignored
if taskLabelValue == "" { if taskLabelValue == "" {
continue continue
} }
// all task labels are required to be present for an agent to match
agentLabelValue, ok := agentFilter.Labels[taskLabel] agentLabelValue, ok := agentFilter.Labels[taskLabel]
if !ok { if !ok {
return false return false, 0
} }
switch {
// if agent label has a wildcard // if agent label has a wildcard
if agentLabelValue == "*" { case agentLabelValue == "*":
continue score++
} // if agent label has an exact match
case agentLabelValue == taskLabelValue:
if taskLabelValue != agentLabelValue { score += 10
return false // agent doesn't match
default:
return false, 0
} }
} }
return true return true, score
} }
} }

View file

@ -24,68 +24,110 @@ import (
) )
func TestCreateFilterFunc(t *testing.T) { func TestCreateFilterFunc(t *testing.T) {
t.Parallel()
tests := []struct { tests := []struct {
name string name string
agentLabels map[string]string agentFilter rpc.Filter
task model.Task task *model.Task
exp bool wantMatched bool
wantScore int
}{ }{
{ {
name: "agent with missing labels", name: "Two exact matches",
agentLabels: map[string]string{"repo": "test/woodpecker"}, agentFilter: rpc.Filter{
task: model.Task{ Labels: map[string]string{"org-id": "123", "platform": "linux"},
Labels: map[string]string{"platform": "linux/amd64", "repo": "test/woodpecker"},
}, },
exp: false, task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "linux"},
},
wantMatched: true,
wantScore: 20,
}, },
{ {
name: "agent with wrong labels", name: "Wildcard and exact match",
agentLabels: map[string]string{"platform": "linux/arm64"}, agentFilter: rpc.Filter{
task: model.Task{ Labels: map[string]string{"org-id": "*", "platform": "linux"},
Labels: map[string]string{"platform": "linux/amd64"},
}, },
exp: false, task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "linux"},
},
wantMatched: true,
wantScore: 11,
}, },
{ {
name: "agent with correct labels", name: "Partial match",
agentLabels: map[string]string{"platform": "linux/amd64", "location": "europe"}, agentFilter: rpc.Filter{
task: model.Task{ Labels: map[string]string{"org-id": "123", "platform": "linux"},
Labels: map[string]string{"platform": "linux/amd64", "location": "europe"},
}, },
exp: true, task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "windows"},
},
wantMatched: false,
wantScore: 0,
}, },
{ {
name: "agent with additional labels", name: "No match",
agentLabels: map[string]string{"platform": "linux/amd64", "location": "europe"}, agentFilter: rpc.Filter{
task: model.Task{ Labels: map[string]string{"org-id": "456", "platform": "linux"},
Labels: map[string]string{"platform": "linux/amd64"},
}, },
exp: true, task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "windows"},
},
wantMatched: false,
wantScore: 0,
}, },
{ {
name: "agent with wildcard label", name: "Missing label",
agentLabels: map[string]string{"platform": "linux/amd64", "location": "*"}, agentFilter: rpc.Filter{
task: model.Task{ Labels: map[string]string{"platform": "linux"},
Labels: map[string]string{"platform": "linux/amd64", "location": "america"},
}, },
exp: true, task: &model.Task{
Labels: map[string]string{"needed": "some"},
},
wantMatched: false,
wantScore: 0,
}, },
{ {
name: "agent with platform label and task without", name: "Empty task labels",
agentLabels: map[string]string{"platform": "linux/amd64"}, agentFilter: rpc.Filter{
task: model.Task{ Labels: map[string]string{"org-id": "123", "platform": "linux"},
Labels: map[string]string{"platform": ""},
}, },
exp: true, task: &model.Task{
Labels: map[string]string{},
},
wantMatched: true,
wantScore: 0,
},
{
name: "Agent with additional label",
agentFilter: rpc.Filter{
Labels: map[string]string{"org-id": "123", "platform": "linux", "extra": "value"},
},
task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "linux", "empty": ""},
},
wantMatched: true,
wantScore: 20,
},
{
name: "Two wildcard matches",
agentFilter: rpc.Filter{
Labels: map[string]string{"org-id": "*", "platform": "*"},
},
task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "linux"},
},
wantMatched: true,
wantScore: 2,
}, },
} }
for _, test := range tests { for _, tt := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
fn := createFilterFunc(rpc.Filter{Labels: test.agentLabels}) filterFunc := createFilterFunc(tt.agentFilter)
assert.EqualValues(t, test.exp, fn(&test.task)) gotMatched, gotScore := filterFunc(tt.task)
assert.Equal(t, tt.wantMatched, gotMatched, "Matched result")
assert.Equal(t, tt.wantScore, gotScore, "Score")
}) })
} }
} }

View file

@ -57,6 +57,8 @@ type fifo struct {
// as the agent pull in 10 milliseconds we should also give them work asap. // as the agent pull in 10 milliseconds we should also give them work asap.
const processTimeInterval = 100 * time.Millisecond const processTimeInterval = 100 * time.Millisecond
var ErrWorkerKicked = fmt.Errorf("worker was kicked")
// New returns a new fifo queue. // New returns a new fifo queue.
func New(ctx context.Context) Queue { func New(ctx context.Context) Queue {
q := &fifo{ q := &fifo{
@ -91,27 +93,27 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
} }
// Poll retrieves and removes a task head of this queue. // Poll retrieves and removes a task head of this queue.
func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { func (q *fifo) Poll(c context.Context, agentID int64, filter FilterFn) (*model.Task, error) {
q.Lock() q.Lock()
ctx, stop := context.WithCancelCause(c) ctx, stop := context.WithCancelCause(c)
w := &worker{ _worker := &worker{
agentID: agentID, agentID: agentID,
channel: make(chan *model.Task, 1), channel: make(chan *model.Task, 1),
filter: f, filter: filter,
stop: stop, stop: stop,
} }
q.workers[w] = struct{}{} q.workers[_worker] = struct{}{}
q.Unlock() q.Unlock()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
q.Lock() q.Lock()
delete(q.workers, w) delete(q.workers, _worker)
q.Unlock() q.Unlock()
return nil, ctx.Err() return nil, ctx.Err()
case t := <-w.channel: case t := <-_worker.channel:
return t, nil return t, nil
} }
} }
@ -152,22 +154,22 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e
} }
// Evict removes a pending task from the queue. // Evict removes a pending task from the queue.
func (q *fifo) Evict(c context.Context, id string) error { func (q *fifo) Evict(ctx context.Context, taskID string) error {
return q.EvictAtOnce(c, []string{id}) return q.EvictAtOnce(ctx, []string{taskID})
} }
// EvictAtOnce removes multiple pending tasks from the queue. // EvictAtOnce removes multiple pending tasks from the queue.
func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error { func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
for _, id := range ids { for _, id := range taskIDs {
var next *list.Element var next *list.Element
for e := q.pending.Front(); e != nil; e = next { for element := q.pending.Front(); element != nil; element = next {
next = e.Next() next = element.Next()
task, ok := e.Value.(*model.Task) task, ok := element.Value.(*model.Task)
if ok && task.ID == id { if ok && task.ID == id {
q.pending.Remove(e) q.pending.Remove(element)
return nil return nil
} }
} }
@ -176,13 +178,13 @@ func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error {
} }
// Wait waits until the item is done executing. // Wait waits until the item is done executing.
func (q *fifo) Wait(c context.Context, id string) error { func (q *fifo) Wait(ctx context.Context, taskID string) error {
q.Lock() q.Lock()
state := q.running[id] state := q.running[taskID]
q.Unlock() q.Unlock()
if state != nil { if state != nil {
select { select {
case <-c.Done(): case <-ctx.Done():
case <-state.done: case <-state.done:
return state.error return state.error
} }
@ -191,11 +193,11 @@ func (q *fifo) Wait(c context.Context, id string) error {
} }
// Extend extends the task execution deadline. // Extend extends the task execution deadline.
func (q *fifo) Extend(_ context.Context, agentID int64, id string) error { func (q *fifo) Extend(_ context.Context, agentID int64, taskID string) error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
state, ok := q.running[id] state, ok := q.running[taskID]
if ok { if ok {
if state.item.AgentID != agentID { if state.item.AgentID != agentID {
return ErrAgentMissMatch return ErrAgentMissMatch
@ -216,12 +218,12 @@ func (q *fifo) Info(_ context.Context) InfoT {
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running) stats.Stats.Running = len(q.running)
for e := q.pending.Front(); e != nil; e = e.Next() { for element := q.pending.Front(); element != nil; element = element.Next() {
task, _ := e.Value.(*model.Task) task, _ := element.Value.(*model.Task)
stats.Pending = append(stats.Pending, task) stats.Pending = append(stats.Pending, task)
} }
for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() { for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
task, _ := e.Value.(*model.Task) task, _ := element.Value.(*model.Task)
stats.WaitingOnDeps = append(stats.WaitingOnDeps, task) stats.WaitingOnDeps = append(stats.WaitingOnDeps, task)
} }
for _, entry := range q.running { for _, entry := range q.running {
@ -252,10 +254,10 @@ func (q *fifo) KickAgentWorkers(agentID int64) {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
for w := range q.workers { for worker := range q.workers {
if w.agentID == agentID { if worker.agentID == agentID {
w.stop(fmt.Errorf("worker was kicked")) worker.stop(ErrWorkerKicked)
delete(q.workers, w) delete(q.workers, worker)
} }
} }
} }
@ -307,13 +309,13 @@ func (q *fifo) filterWaiting() {
q.waitingOnDeps = list.New() q.waitingOnDeps = list.New()
var filtered []*list.Element var filtered []*list.Element
var nextPending *list.Element var nextPending *list.Element
for e := q.pending.Front(); e != nil; e = nextPending { for element := q.pending.Front(); element != nil; element = nextPending {
nextPending = e.Next() nextPending = element.Next()
task, _ := e.Value.(*model.Task) task, _ := element.Value.(*model.Task)
if q.depsInQueue(task) { if q.depsInQueue(task) {
log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID) log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID)
q.waitingOnDeps.PushBack(task) q.waitingOnDeps.PushBack(task)
filtered = append(filtered, e) filtered = append(filtered, element)
} }
} }
@ -325,37 +327,45 @@ func (q *fifo) filterWaiting() {
func (q *fifo) assignToWorker() (*list.Element, *worker) { func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element var next *list.Element
for e := q.pending.Front(); e != nil; e = next { var bestWorker *worker
next = e.Next() var bestScore int
task, _ := e.Value.(*model.Task)
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
task, _ := element.Value.(*model.Task)
log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
for w := range q.workers { for worker := range q.workers {
if w.filter(task) { matched, score := worker.filter(task)
log.Debug().Msgf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies) if matched && score > bestScore {
return e, w bestWorker = worker
bestScore = score
} }
} }
if bestWorker != nil {
log.Debug().Msgf("queue: assigned task: %v with deps %v to worker with score %d", task.ID, task.Dependencies, bestScore)
return element, bestWorker
}
} }
return nil, nil return nil, nil
} }
func (q *fifo) resubmitExpiredPipelines() { func (q *fifo) resubmitExpiredPipelines() {
for id, state := range q.running { for taskID, taskState := range q.running {
if time.Now().After(state.deadline) { if time.Now().After(taskState.deadline) {
q.pending.PushFront(state.item) q.pending.PushFront(taskState.item)
delete(q.running, id) delete(q.running, taskID)
close(state.done) close(taskState.done)
} }
} }
} }
func (q *fifo) depsInQueue(task *model.Task) bool { func (q *fifo) depsInQueue(task *model.Task) bool {
var next *list.Element var next *list.Element
for e := q.pending.Front(); e != nil; e = next { for element := q.pending.Front(); element != nil; element = next {
next = e.Next() next = element.Next()
possibleDep, ok := e.Value.(*model.Task) possibleDep, ok := element.Value.(*model.Task)
log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID) log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies { for _, dep := range task.Dependencies {
if ok && possibleDep.ID == dep { if ok && possibleDep.ID == dep {
@ -376,9 +386,9 @@ func (q *fifo) depsInQueue(task *model.Task) bool {
func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
var next *list.Element var next *list.Element
for e := q.pending.Front(); e != nil; e = next { for element := q.pending.Front(); element != nil; element = next {
next = e.Next() next = element.Next()
pending, ok := e.Value.(*model.Task) pending, ok := element.Value.(*model.Task)
for _, dep := range pending.Dependencies { for _, dep := range pending.Dependencies {
if ok && taskID == dep { if ok && taskID == dep {
pending.DepStatus[dep] = status pending.DepStatus[dep] = status
@ -394,9 +404,9 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
} }
} }
for e := q.waitingOnDeps.Front(); e != nil; e = next { for element := q.waitingOnDeps.Front(); element != nil; element = next {
next = e.Next() next = element.Next()
waiting, ok := e.Value.(*model.Task) waiting, ok := element.Value.(*model.Task)
for _, dep := range waiting.Dependencies { for _, dep := range waiting.Dependencies {
if ok && taskID == dep { if ok && taskID == dep {
waiting.DepStatus[dep] = status waiting.DepStatus[dep] = status
@ -408,12 +418,12 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
func (q *fifo) removeFromPending(taskID string) { func (q *fifo) removeFromPending(taskID string) {
log.Debug().Msgf("queue: trying to remove %s", taskID) log.Debug().Msgf("queue: trying to remove %s", taskID)
var next *list.Element var next *list.Element
for e := q.pending.Front(); e != nil; e = next { for element := q.pending.Front(); element != nil; element = next {
next = e.Next() next = element.Next()
task, _ := e.Value.(*model.Task) task, _ := element.Value.(*model.Task)
if task.ID == taskID { if task.ID == taskID {
log.Debug().Msgf("queue: %s is removed from pending", taskID) log.Debug().Msgf("queue: %s is removed from pending", taskID)
q.pending.Remove(e) q.pending.Remove(element)
return return
} }
} }

View file

@ -26,26 +26,27 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/model"
) )
var noContext = context.Background() var filterFnTrue = func(*model.Task) (bool, int) { return true, 1 }
func TestFifo(t *testing.T) { func TestFifo(t *testing.T) {
want := &model.Task{ID: "1"} want := &model.Task{ID: "1"}
ctx := context.Background()
q := New(context.Background()) q := New(ctx)
assert.NoError(t, q.Push(noContext, want)) assert.NoError(t, q.Push(ctx, want))
info := q.Info(noContext) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, want, got)
info = q.Info(noContext) info = q.Info(ctx)
assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Pending, 0, "expect task removed from pending queue")
assert.Len(t, info.Running, 1, "expect task in running queue") assert.Len(t, info.Running, 1, "expect task in running queue")
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
info = q.Info(noContext) info = q.Info(ctx)
assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Pending, 0, "expect task removed from pending queue")
assert.Len(t, info.Running, 0, "expect task removed from running queue") assert.Len(t, info.Running, 0, "expect task removed from running queue")
} }
@ -60,7 +61,7 @@ func TestFifoExpire(t *testing.T) {
info := q.Info(ctx) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
got, err := q.Poll(ctx, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, want, got)
@ -75,42 +76,45 @@ func TestFifoExpire(t *testing.T) {
func TestFifoWait(t *testing.T) { func TestFifoWait(t *testing.T) {
want := &model.Task{ID: "1"} want := &model.Task{ID: "1"}
ctx := context.Background()
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.Push(noContext, want)) assert.NoError(t, q.Push(ctx, want))
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, want, got)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
assert.NoError(t, q.Wait(noContext, got.ID)) assert.NoError(t, q.Wait(ctx, got.ID))
wg.Done() wg.Done()
}() }()
<-time.After(time.Millisecond) <-time.After(time.Millisecond)
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
wg.Wait() wg.Wait()
} }
func TestFifoEvict(t *testing.T) { func TestFifoEvict(t *testing.T) {
t1 := &model.Task{ID: "1"} t1 := &model.Task{ID: "1"}
ctx := context.Background()
q := New(context.Background()) q := New(ctx)
assert.NoError(t, q.Push(noContext, t1)) assert.NoError(t, q.Push(ctx, t1))
info := q.Info(noContext) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
err := q.Evict(noContext, t1.ID) err := q.Evict(ctx, t1.ID)
assert.NoError(t, err) assert.NoError(t, err)
info = q.Info(noContext) info = q.Info(ctx)
assert.Len(t, info.Pending, 0) assert.Len(t, info.Pending, 0)
err = q.Evict(noContext, t1.ID) err = q.Evict(ctx, t1.ID)
assert.ErrorIs(t, err, ErrNotFound) assert.ErrorIs(t, err, ErrNotFound)
} }
func TestFifoDependencies(t *testing.T) { func TestFifoDependencies(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -121,21 +125,22 @@ func TestFifoDependencies(t *testing.T) {
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1}))
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
} }
func TestFifoErrors(t *testing.T) { func TestFifoErrors(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -153,27 +158,28 @@ func TestFifoErrors(t *testing.T) {
RunOn: []string{"success", "failure"}, RunOn: []string{"success", "failure"},
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
assert.False(t, got.ShouldRun()) assert.False(t, got.ShouldRun())
got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task3, got) assert.Equal(t, task3, got)
assert.True(t, got.ShouldRun()) assert.True(t, got.ShouldRun())
} }
func TestFifoErrors2(t *testing.T) { func TestFifoErrors2(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -188,29 +194,30 @@ func TestFifoErrors2(t *testing.T) {
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them") assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them")
if got != task1 { if got != task1 {
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
} }
if got != task2 { if got != task2 {
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
} }
} }
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task3, got) assert.Equal(t, task3, got)
assert.False(t, got.ShouldRun()) assert.False(t, got.ShouldRun())
} }
func TestFifoErrorsMultiThread(t *testing.T) { func TestFifoErrorsMultiThread(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -227,8 +234,8 @@ func TestFifoErrorsMultiThread(t *testing.T) {
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
obtainedWorkCh := make(chan *model.Task) obtainedWorkCh := make(chan *model.Task)
@ -236,7 +243,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
go func(i int) { go func(i int) {
for { for {
fmt.Printf("Worker %d started\n", i) fmt.Printf("Worker %d started\n", i)
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
obtainedWorkCh <- got obtainedWorkCh <- got
} }
@ -255,22 +262,22 @@ func TestFifoErrorsMultiThread(t *testing.T) {
case !task1Processed: case !task1Processed:
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
task1Processed = true task1Processed = true
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
go func() { go func() {
for { for {
fmt.Printf("Worker spawned\n") fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, _ := q.Poll(ctx, 1, filterFnTrue)
obtainedWorkCh <- got obtainedWorkCh <- got
} }
}() }()
case !task2Processed: case !task2Processed:
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
task2Processed = true task2Processed = true
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
go func() { go func() {
for { for {
fmt.Printf("Worker spawned\n") fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, _ := q.Poll(ctx, 1, filterFnTrue)
obtainedWorkCh <- got obtainedWorkCh <- got
} }
}() }()
@ -281,7 +288,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
} }
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
info := q.Info(noContext) info := q.Info(ctx)
fmt.Println(info.String()) fmt.Println(info.String())
t.Errorf("test timed out") t.Errorf("test timed out")
return return
@ -290,6 +297,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
} }
func TestFifoTransitiveErrors(t *testing.T) { func TestFifoTransitiveErrors(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -306,27 +314,28 @@ func TestFifoTransitiveErrors(t *testing.T) {
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed") assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed")
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped))
got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task3, got) assert.Equal(t, task3, got)
assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too") assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too")
} }
func TestFifoCancel(t *testing.T) { func TestFifoCancel(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -344,34 +353,35 @@ func TestFifoCancel(t *testing.T) {
RunOn: []string{"success", "failure"}, RunOn: []string{"success", "failure"},
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) _, _ = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task1.ID, fmt.Errorf("canceled")))
assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task2.ID, fmt.Errorf("canceled")))
assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task3.ID, fmt.Errorf("canceled")))
info := q.Info(noContext) info := q.Info(ctx)
assert.Len(t, info.Pending, 0, "all pipelines should be canceled") assert.Len(t, info.Pending, 0, "all pipelines should be canceled")
} }
func TestFifoPause(t *testing.T) { func TestFifoPause(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) _, _ = q.Poll(ctx, 1, filterFnTrue)
wg.Done() wg.Done()
}() }()
q.Pause() q.Pause()
t0 := time.Now() t0 := time.Now()
assert.NoError(t, q.Push(noContext, task1)) assert.NoError(t, q.Push(ctx, task1))
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
q.Resume() q.Resume()
@ -381,25 +391,27 @@ func TestFifoPause(t *testing.T) {
assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume") assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume")
q.Pause() q.Pause()
assert.NoError(t, q.Push(noContext, task1)) assert.NoError(t, q.Push(ctx, task1))
q.Resume() q.Resume()
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) _, _ = q.Poll(ctx, 1, filterFnTrue)
} }
func TestFifoPauseResume(t *testing.T) { func TestFifoPauseResume(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
q.Pause() q.Pause()
assert.NoError(t, q.Push(noContext, task1)) assert.NoError(t, q.Push(ctx, task1))
q.Resume() q.Resume()
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true }) _, _ = q.Poll(ctx, 1, filterFnTrue)
} }
func TestWaitingVsPending(t *testing.T) { func TestWaitingVsPending(t *testing.T) {
ctx := context.Background()
task1 := &model.Task{ task1 := &model.Task{
ID: "1", ID: "1",
} }
@ -417,20 +429,20 @@ func TestWaitingVsPending(t *testing.T) {
RunOn: []string{"success", "failure"}, RunOn: []string{"success", "failure"},
} }
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, _ := q.Poll(ctx, 1, filterFnTrue)
info := q.Info(noContext) info := q.Info(ctx)
assert.Equal(t, 2, info.Stats.WaitingOnDeps) assert.Equal(t, 2, info.Stats.WaitingOnDeps)
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, task2, got) assert.EqualValues(t, task2, got)
info = q.Info(noContext) info = q.Info(ctx)
assert.Equal(t, 0, info.Stats.WaitingOnDeps) assert.Equal(t, 0, info.Stats.WaitingOnDeps)
assert.Equal(t, 1, info.Stats.Pending) assert.Equal(t, 1, info.Stats.Pending)
} }
@ -504,3 +516,108 @@ func TestShouldRun(t *testing.T) {
} }
assert.True(t, task.ShouldRun(), "on failure, tasks should run on skipped deps, something failed higher up the chain") assert.True(t, task.ShouldRun(), "on failure, tasks should run on skipped deps, something failed higher up the chain")
} }
func TestFifoWithScoring(t *testing.T) {
ctx := context.Background()
q := New(ctx)
// Create tasks with different labels
tasks := []*model.Task{
{ID: "1", Labels: map[string]string{"org-id": "123", "platform": "linux"}},
{ID: "2", Labels: map[string]string{"org-id": "456", "platform": "linux"}},
{ID: "3", Labels: map[string]string{"org-id": "789", "platform": "windows"}},
{ID: "4", Labels: map[string]string{"org-id": "123", "platform": "linux"}},
{ID: "5", Labels: map[string]string{"org-id": "*", "platform": "linux"}},
}
for _, task := range tasks {
assert.NoError(t, q.Push(ctx, task))
}
// Create filter functions for different workers
filters := map[int]FilterFn{
1: func(task *model.Task) (bool, int) {
if task.Labels["org-id"] == "123" {
return true, 20
}
if task.Labels["platform"] == "linux" {
return true, 10
}
return true, 1
},
2: func(task *model.Task) (bool, int) {
if task.Labels["org-id"] == "456" {
return true, 20
}
if task.Labels["platform"] == "linux" {
return true, 10
}
return true, 1
},
3: func(task *model.Task) (bool, int) {
if task.Labels["platform"] == "windows" {
return true, 20
}
return true, 1
},
4: func(task *model.Task) (bool, int) {
if task.Labels["org-id"] == "123" {
return true, 20
}
if task.Labels["platform"] == "linux" {
return true, 10
}
return true, 1
},
5: func(task *model.Task) (bool, int) {
if task.Labels["org-id"] == "*" {
return true, 15
}
return true, 1
},
}
// Start polling in separate goroutines
results := make(chan *model.Task, 5)
for i := 1; i <= 5; i++ {
go func(n int) {
task, err := q.Poll(ctx, int64(n), filters[n])
assert.NoError(t, err)
results <- task
}(i)
}
// Collect results
receivedTasks := make(map[string]int64)
for i := 0; i < 5; i++ {
select {
case task := <-results:
receivedTasks[task.ID] = task.AgentID
case <-time.After(time.Second):
t.Fatal("Timeout waiting for tasks")
}
}
assert.Len(t, receivedTasks, 5, "All tasks should be assigned")
// Define expected agent assignments
// Map structure: {taskID: []possible agentIDs}
// - taskID "1" and "4" can be assigned to agents 1 or 4 (org-id "123")
// - taskID "2" should be assigned to agent 2 (org-id "456")
// - taskID "3" should be assigned to agent 3 (platform "windows")
// - taskID "5" should be assigned to agent 5 (org-id "*")
expectedAssignments := map[string][]int64{
"1": {1, 4},
"2": {2},
"3": {3},
"4": {1, 4},
"5": {5},
}
// Check if tasks are assigned as expected
for taskID, expectedAgents := range expectedAssignments {
agentID, ok := receivedTasks[taskID]
assert.True(t, ok, "Task %s should be assigned", taskID)
assert.Contains(t, expectedAgents, agentID, "Task %s should be assigned to one of the expected agents", taskID)
}
}

View file

@ -67,7 +67,8 @@ func (t *InfoT) String() string {
// Filter filters tasks in the queue. If the Filter returns false, // Filter filters tasks in the queue. If the Filter returns false,
// the Task is skipped and not returned to the subscriber. // the Task is skipped and not returned to the subscriber.
type FilterFn func(*model.Task) bool // The int return value represents the matching score (higher is better).
type FilterFn func(*model.Task) (bool, int)
// Queue defines a task queue for scheduling tasks among // Queue defines a task queue for scheduling tasks among
// a pool of workers. // a pool of workers.