Harden and correct fifo task queue tests (#4377)

This commit is contained in:
6543 2024-11-18 16:50:17 +01:00 committed by GitHub
parent d0927e37dc
commit e95db23794
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 142 additions and 76 deletions

View file

@ -38,6 +38,7 @@
addlicense addlicense
protoc-gen-go protoc-gen-go
protoc-gen-go-grpc protoc-gen-go-grpc
gcc
]; ];
CFLAGS = "-I${pkgs.glibc.dev}/include"; CFLAGS = "-I${pkgs.glibc.dev}/include";
LDFLAGS = "-L${pkgs.glibc}/lib"; LDFLAGS = "-L${pkgs.glibc}/lib";

View file

@ -16,6 +16,7 @@ package queue
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"testing" "testing"
@ -26,64 +27,85 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/model"
) )
var filterFnTrue = func(*model.Task) (bool, int) { return true, 1 } var (
filterFnTrue = func(*model.Task) (bool, int) { return true, 1 }
genDummyTask = func() *model.Task {
return &model.Task{
ID: "1",
Data: []byte("{}"),
}
}
waitForProcess = func() { time.Sleep(processTimeInterval + 10*time.Millisecond) }
)
func TestFifo(t *testing.T) { func TestFifo(t *testing.T) {
want := &model.Task{ID: "1"} ctx, cancel := context.WithCancelCause(context.Background())
ctx := context.Background() t.Cleanup(func() { cancel(nil) })
q := NewMemoryQueue(ctx) q := NewMemoryQueue(ctx)
assert.NoError(t, q.Push(ctx, want)) dummyTask := genDummyTask()
assert.NoError(t, q.Push(ctx, dummyTask))
waitForProcess()
info := q.Info(ctx) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, dummyTask, got)
waitForProcess()
info = q.Info(ctx) info = q.Info(ctx)
assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Pending, 0, "expect task removed from pending queue")
assert.Len(t, info.Running, 1, "expect task in running queue") assert.Len(t, info.Running, 1, "expect task in running queue")
assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
waitForProcess()
info = q.Info(ctx) info = q.Info(ctx)
assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Pending, 0, "expect task removed from pending queue")
assert.Len(t, info.Running, 0, "expect task removed from running queue") assert.Len(t, info.Running, 0, "expect task removed from running queue")
} }
func TestFifoExpire(t *testing.T) { func TestFifoExpire(t *testing.T) {
want := &model.Task{ID: "1"}
ctx, cancel := context.WithCancelCause(context.Background()) ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(nil) })
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
dummyTask := genDummyTask()
q.extension = 0 q.extension = 0
assert.NoError(t, q.Push(ctx, want)) assert.NoError(t, q.Push(ctx, dummyTask))
waitForProcess()
info := q.Info(ctx) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
waitForProcess()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, dummyTask, got)
// cancel the context to let the process func end info = q.Info(ctx)
go func() {
time.Sleep(time.Millisecond)
cancel(nil)
}()
q.process()
assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") assert.Len(t, info.Pending, 1, "expect task re-added to pending queue")
} }
func TestFifoWait(t *testing.T) { func TestFifoWait(t *testing.T) {
want := &model.Task{ID: "1"} ctx, cancel := context.WithCancelCause(context.Background())
ctx := context.Background() t.Cleanup(func() { cancel(nil) })
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.Push(ctx, want)) assert.NotNil(t, q)
dummyTask := genDummyTask()
assert.NoError(t, q.Push(ctx, dummyTask))
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, dummyTask, got)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -98,27 +120,34 @@ func TestFifoWait(t *testing.T) {
} }
func TestFifoEvict(t *testing.T) { func TestFifoEvict(t *testing.T) {
t1 := &model.Task{ID: "1"} ctx, cancel := context.WithCancelCause(context.Background())
ctx := context.Background() t.Cleanup(func() { cancel(nil) })
q := NewMemoryQueue(ctx) q := NewMemoryQueue(ctx)
assert.NoError(t, q.Push(ctx, t1)) dummyTask := genDummyTask()
assert.NoError(t, q.Push(ctx, dummyTask))
waitForProcess()
info := q.Info(ctx) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
err := q.Evict(ctx, t1.ID)
err := q.Evict(ctx, dummyTask.ID)
assert.NoError(t, err) assert.NoError(t, err)
waitForProcess()
info = q.Info(ctx) info = q.Info(ctx)
assert.Len(t, info.Pending, 0) assert.Len(t, info.Pending, 0)
err = q.Evict(ctx, t1.ID)
err = q.Evict(ctx, dummyTask.ID)
assert.ErrorIs(t, err, ErrNotFound) assert.ErrorIs(t, err, ErrNotFound)
} }
func TestFifoDependencies(t *testing.T) { func TestFifoDependencies(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
@ -126,31 +155,34 @@ func TestFifoDependencies(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1}))
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
waitForProcess()
assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess))
waitForProcess()
got, err = q.Poll(ctx, 1, filterFnTrue) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
} }
func TestFifoErrors(t *testing.T) { func TestFifoErrors(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
task3 := &model.Task{ task3 := &model.Task{
ID: "3", ID: "3",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
@ -159,19 +191,24 @@ func TestFifoErrors(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
waitForProcess()
got, err = q.Poll(ctx, 1, filterFnTrue) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
assert.False(t, got.ShouldRun()) assert.False(t, got.ShouldRun())
waitForProcess()
got, err = q.Poll(ctx, 1, filterFnTrue) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task3, got) assert.Equal(t, task3, got)
@ -179,15 +216,13 @@ func TestFifoErrors(t *testing.T) {
} }
func TestFifoErrors2(t *testing.T) { func TestFifoErrors2(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
} }
task3 := &model.Task{ task3 := &model.Task{
ID: "3", ID: "3",
Dependencies: []string{"1", "2"}, Dependencies: []string{"1", "2"},
@ -195,9 +230,12 @@ func TestFifoErrors2(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them") assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them")
@ -210,6 +248,7 @@ func TestFifoErrors2(t *testing.T) {
} }
} }
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task3, got) assert.Equal(t, task3, got)
@ -217,17 +256,15 @@ func TestFifoErrors2(t *testing.T) {
} }
func TestFifoErrorsMultiThread(t *testing.T) { func TestFifoErrorsMultiThread(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
task3 := &model.Task{ task3 := &model.Task{
ID: "3", ID: "3",
Dependencies: []string{"1", "2"}, Dependencies: []string{"1", "2"},
@ -235,15 +272,21 @@ func TestFifoErrorsMultiThread(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
obtainedWorkCh := make(chan *model.Task) obtainedWorkCh := make(chan *model.Task)
defer func() { close(obtainedWorkCh) }()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
go func(i int) { go func(i int) {
for { for {
fmt.Printf("Worker %d started\n", i) fmt.Printf("Worker %d started\n", i)
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
if err != nil && errors.Is(err, context.Canceled) {
return
}
assert.NoError(t, err) assert.NoError(t, err)
obtainedWorkCh <- got obtainedWorkCh <- got
} }
@ -266,7 +309,11 @@ func TestFifoErrorsMultiThread(t *testing.T) {
go func() { go func() {
for { for {
fmt.Printf("Worker spawned\n") fmt.Printf("Worker spawned\n")
got, _ := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
if err != nil && errors.Is(err, context.Canceled) {
return
}
assert.NoError(t, err)
obtainedWorkCh <- got obtainedWorkCh <- got
} }
}() }()
@ -277,7 +324,11 @@ func TestFifoErrorsMultiThread(t *testing.T) {
go func() { go func() {
for { for {
fmt.Printf("Worker spawned\n") fmt.Printf("Worker spawned\n")
got, _ := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
if err != nil && errors.Is(err, context.Canceled) {
return
}
assert.NoError(t, err)
obtainedWorkCh <- got obtainedWorkCh <- got
} }
}() }()
@ -297,17 +348,15 @@ func TestFifoErrorsMultiThread(t *testing.T) {
} }
func TestFifoTransitiveErrors(t *testing.T) { func TestFifoTransitiveErrors(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
task3 := &model.Task{ task3 := &model.Task{
ID: "3", ID: "3",
Dependencies: []string{"2"}, Dependencies: []string{"2"},
@ -315,19 +364,24 @@ func TestFifoTransitiveErrors(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue) got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task1, got) assert.Equal(t, task1, got)
assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error")))
waitForProcess()
got, err = q.Poll(ctx, 1, filterFnTrue) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task2, got) assert.Equal(t, task2, got)
assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed") assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed")
assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped))
waitForProcess()
got, err = q.Poll(ctx, 1, filterFnTrue) got, err = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task3, got) assert.Equal(t, task3, got)
@ -335,17 +389,15 @@ func TestFifoTransitiveErrors(t *testing.T) {
} }
func TestFifoCancel(t *testing.T) { func TestFifoCancel(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
task3 := &model.Task{ task3 := &model.Task{
ID: "3", ID: "3",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
@ -354,24 +406,33 @@ func TestFifoCancel(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
_, _ = q.Poll(ctx, 1, filterFnTrue) _, _ = q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, q.Error(ctx, task1.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task1.ID, fmt.Errorf("canceled")))
assert.NoError(t, q.Error(ctx, task2.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task2.ID, fmt.Errorf("canceled")))
assert.NoError(t, q.Error(ctx, task3.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task3.ID, fmt.Errorf("canceled")))
info := q.Info(ctx) info := q.Info(ctx)
assert.Len(t, info.Pending, 0, "all pipelines should be canceled") assert.Len(t, info.Pending, 0, "all pipelines should be canceled")
time.Sleep(processTimeInterval * 2)
info = q.Info(ctx)
assert.Len(t, info.Pending, 2, "canceled are rescheduled")
assert.Len(t, info.Running, 0, "canceled are rescheduled")
assert.Len(t, info.WaitingOnDeps, 0, "canceled are rescheduled")
} }
func TestFifoPause(t *testing.T) { func TestFifoPause(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
dummyTask := genDummyTask()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -381,8 +442,8 @@ func TestFifoPause(t *testing.T) {
q.Pause() q.Pause()
t0 := time.Now() t0 := time.Now()
assert.NoError(t, q.Push(ctx, task1)) assert.NoError(t, q.Push(ctx, dummyTask))
time.Sleep(20 * time.Millisecond) waitForProcess()
q.Resume() q.Resume()
wg.Wait() wg.Wait()
@ -391,37 +452,37 @@ func TestFifoPause(t *testing.T) {
assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume") assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume")
q.Pause() q.Pause()
assert.NoError(t, q.Push(ctx, task1)) assert.NoError(t, q.Push(ctx, dummyTask))
q.Resume() q.Resume()
_, _ = q.Poll(ctx, 1, filterFnTrue) _, _ = q.Poll(ctx, 1, filterFnTrue)
} }
func TestFifoPauseResume(t *testing.T) { func TestFifoPauseResume(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
dummyTask := genDummyTask()
q.Pause() q.Pause()
assert.NoError(t, q.Push(ctx, task1)) assert.NoError(t, q.Push(ctx, dummyTask))
q.Resume() q.Resume()
_, _ = q.Poll(ctx, 1, filterFnTrue) _, _ = q.Poll(ctx, 1, filterFnTrue)
} }
func TestWaitingVsPending(t *testing.T) { func TestWaitingVsPending(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
task1 := &model.Task{ t.Cleanup(func() { cancel(nil) })
ID: "1",
}
task1 := genDummyTask()
task2 := &model.Task{ task2 := &model.Task{
ID: "2", ID: "2",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
DepStatus: make(map[string]model.StatusValue), DepStatus: make(map[string]model.StatusValue),
} }
task3 := &model.Task{ task3 := &model.Task{
ID: "3", ID: "3",
Dependencies: []string{"1"}, Dependencies: []string{"1"},
@ -430,10 +491,13 @@ func TestWaitingVsPending(t *testing.T) {
} }
q, _ := NewMemoryQueue(ctx).(*fifo) q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, _ := q.Poll(ctx, 1, filterFnTrue) got, _ := q.Poll(ctx, 1, filterFnTrue)
waitForProcess()
info := q.Info(ctx) info := q.Info(ctx)
assert.Equal(t, 2, info.Stats.WaitingOnDeps) assert.Equal(t, 2, info.Stats.WaitingOnDeps)
@ -442,6 +506,7 @@ func TestWaitingVsPending(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, task2, got) assert.EqualValues(t, task2, got)
waitForProcess()
info = q.Info(ctx) info = q.Info(ctx)
assert.Equal(t, 0, info.Stats.WaitingOnDeps) assert.Equal(t, 0, info.Stats.WaitingOnDeps)
assert.Equal(t, 1, info.Stats.Pending) assert.Equal(t, 1, info.Stats.Pending)
@ -518,7 +583,9 @@ func TestShouldRun(t *testing.T) {
} }
func TestFifoWithScoring(t *testing.T) { func TestFifoWithScoring(t *testing.T) {
ctx := context.Background() ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(nil) })
q := NewMemoryQueue(ctx) q := NewMemoryQueue(ctx)
// Create tasks with different labels // Create tasks with different labels
@ -530,9 +597,7 @@ func TestFifoWithScoring(t *testing.T) {
{ID: "5", Labels: map[string]string{"org-id": "*", "platform": "linux"}}, {ID: "5", Labels: map[string]string{"org-id": "*", "platform": "linux"}},
} }
for _, task := range tasks { assert.NoError(t, q.PushAtOnce(ctx, tasks))
assert.NoError(t, q.Push(ctx, task))
}
// Create filter functions for different workers // Create filter functions for different workers
filters := map[int]FilterFn{ filters := map[int]FilterFn{