From e95db2379440e5e9cfc307e0a8a2460152673252 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 18 Nov 2024 16:50:17 +0100 Subject: [PATCH] Harden and correct fifo task queue tests (#4377) --- flake.nix | 1 + server/queue/fifo_test.go | 217 +++++++++++++++++++++++++------------- 2 files changed, 142 insertions(+), 76 deletions(-) diff --git a/flake.nix b/flake.nix index 188c47524..208c52bf3 100644 --- a/flake.nix +++ b/flake.nix @@ -38,6 +38,7 @@ addlicense protoc-gen-go protoc-gen-go-grpc + gcc ]; CFLAGS = "-I${pkgs.glibc.dev}/include"; LDFLAGS = "-L${pkgs.glibc}/lib"; diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 6c6d4252d..1ce4de008 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -16,6 +16,7 @@ package queue import ( "context" + "errors" "fmt" "sync" "testing" @@ -26,64 +27,85 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/server/model" ) -var filterFnTrue = func(*model.Task) (bool, int) { return true, 1 } +var ( + filterFnTrue = func(*model.Task) (bool, int) { return true, 1 } + genDummyTask = func() *model.Task { + return &model.Task{ + ID: "1", + Data: []byte("{}"), + } + } + waitForProcess = func() { time.Sleep(processTimeInterval + 10*time.Millisecond) } +) func TestFifo(t *testing.T) { - want := &model.Task{ID: "1"} - ctx := context.Background() + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) q := NewMemoryQueue(ctx) - assert.NoError(t, q.Push(ctx, want)) + dummyTask := genDummyTask() + + assert.NoError(t, q.Push(ctx, dummyTask)) + waitForProcess() info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) - assert.Equal(t, want, got) + assert.Equal(t, dummyTask, got) + waitForProcess() info = q.Info(ctx) assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Running, 1, "expect task in running queue") assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + + waitForProcess() info = q.Info(ctx) assert.Len(t, info.Pending, 0, "expect task removed from pending queue") assert.Len(t, info.Running, 0, "expect task removed from running queue") } func TestFifoExpire(t *testing.T) { - want := &model.Task{ID: "1"} ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + + dummyTask := genDummyTask() + q.extension = 0 - assert.NoError(t, q.Push(ctx, want)) + assert.NoError(t, q.Push(ctx, dummyTask)) + waitForProcess() info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") got, err := q.Poll(ctx, 1, filterFnTrue) + waitForProcess() assert.NoError(t, err) - assert.Equal(t, want, got) + assert.Equal(t, dummyTask, got) - // cancel the context to let the process func end - go func() { - time.Sleep(time.Millisecond) - cancel(nil) - }() - q.process() + info = q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") } func TestFifoWait(t *testing.T) { - want := &model.Task{ID: "1"} - ctx := context.Background() + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NoError(t, q.Push(ctx, want)) + assert.NotNil(t, q) + dummyTask := genDummyTask() + + assert.NoError(t, q.Push(ctx, dummyTask)) + + waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) - assert.Equal(t, want, got) + assert.Equal(t, dummyTask, got) var wg sync.WaitGroup wg.Add(1) @@ -98,27 +120,34 @@ func TestFifoWait(t *testing.T) { } func TestFifoEvict(t *testing.T) { - t1 := &model.Task{ID: "1"} - ctx := context.Background() + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) q := NewMemoryQueue(ctx) - assert.NoError(t, q.Push(ctx, t1)) + dummyTask := genDummyTask() + + assert.NoError(t, q.Push(ctx, dummyTask)) + + waitForProcess() info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") - err := q.Evict(ctx, t1.ID) + + err := q.Evict(ctx, dummyTask.ID) assert.NoError(t, err) + + waitForProcess() info = q.Info(ctx) assert.Len(t, info.Pending, 0) - err = q.Evict(ctx, t1.ID) + + err = q.Evict(ctx, dummyTask.ID) assert.ErrorIs(t, err, ErrNotFound) } func TestFifoDependencies(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, @@ -126,31 +155,34 @@ func TestFifoDependencies(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1})) + waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) + waitForProcess() assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) } func TestFifoErrors(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, DepStatus: make(map[string]model.StatusValue), } - task3 := &model.Task{ ID: "3", Dependencies: []string{"1"}, @@ -159,19 +191,24 @@ func TestFifoErrors(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) + waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) + waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) assert.False(t, got.ShouldRun()) + waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) @@ -179,15 +216,13 @@ func TestFifoErrors(t *testing.T) { } func TestFifoErrors2(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", } - task3 := &model.Task{ ID: "3", Dependencies: []string{"1", "2"}, @@ -195,9 +230,12 @@ func TestFifoErrors2(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) for i := 0; i < 2; i++ { + waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them") @@ -210,6 +248,7 @@ func TestFifoErrors2(t *testing.T) { } } + waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) @@ -217,17 +256,15 @@ func TestFifoErrors2(t *testing.T) { } func TestFifoErrorsMultiThread(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, DepStatus: make(map[string]model.StatusValue), } - task3 := &model.Task{ ID: "3", Dependencies: []string{"1", "2"}, @@ -235,15 +272,21 @@ func TestFifoErrorsMultiThread(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) obtainedWorkCh := make(chan *model.Task) + defer func() { close(obtainedWorkCh) }() for i := 0; i < 10; i++ { go func(i int) { for { fmt.Printf("Worker %d started\n", i) got, err := q.Poll(ctx, 1, filterFnTrue) + if err != nil && errors.Is(err, context.Canceled) { + return + } assert.NoError(t, err) obtainedWorkCh <- got } @@ -266,7 +309,11 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(ctx, 1, filterFnTrue) + got, err := q.Poll(ctx, 1, filterFnTrue) + if err != nil && errors.Is(err, context.Canceled) { + return + } + assert.NoError(t, err) obtainedWorkCh <- got } }() @@ -277,7 +324,11 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(ctx, 1, filterFnTrue) + got, err := q.Poll(ctx, 1, filterFnTrue) + if err != nil && errors.Is(err, context.Canceled) { + return + } + assert.NoError(t, err) obtainedWorkCh <- got } }() @@ -297,17 +348,15 @@ func TestFifoErrorsMultiThread(t *testing.T) { } func TestFifoTransitiveErrors(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, DepStatus: make(map[string]model.StatusValue), } - task3 := &model.Task{ ID: "3", Dependencies: []string{"2"}, @@ -315,19 +364,24 @@ func TestFifoTransitiveErrors(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) + waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) + waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed") assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) + waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) @@ -335,17 +389,15 @@ func TestFifoTransitiveErrors(t *testing.T) { } func TestFifoCancel(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, DepStatus: make(map[string]model.StatusValue), } - task3 := &model.Task{ ID: "3", Dependencies: []string{"1"}, @@ -354,24 +406,33 @@ func TestFifoCancel(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) _, _ = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.Error(ctx, task1.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task2.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(ctx, task3.ID, fmt.Errorf("canceled"))) - info := q.Info(ctx) assert.Len(t, info.Pending, 0, "all pipelines should be canceled") + + time.Sleep(processTimeInterval * 2) + info = q.Info(ctx) + assert.Len(t, info.Pending, 2, "canceled are rescheduled") + assert.Len(t, info.Running, 0, "canceled are rescheduled") + assert.Len(t, info.WaitingOnDeps, 0, "canceled are rescheduled") } func TestFifoPause(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + + dummyTask := genDummyTask() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -381,8 +442,8 @@ func TestFifoPause(t *testing.T) { q.Pause() t0 := time.Now() - assert.NoError(t, q.Push(ctx, task1)) - time.Sleep(20 * time.Millisecond) + assert.NoError(t, q.Push(ctx, dummyTask)) + waitForProcess() q.Resume() wg.Wait() @@ -391,37 +452,37 @@ func TestFifoPause(t *testing.T) { assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume") q.Pause() - assert.NoError(t, q.Push(ctx, task1)) + assert.NoError(t, q.Push(ctx, dummyTask)) q.Resume() _, _ = q.Poll(ctx, 1, filterFnTrue) } func TestFifoPauseResume(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + + dummyTask := genDummyTask() + q.Pause() - assert.NoError(t, q.Push(ctx, task1)) + assert.NoError(t, q.Push(ctx, dummyTask)) q.Resume() _, _ = q.Poll(ctx, 1, filterFnTrue) } func TestWaitingVsPending(t *testing.T) { - ctx := context.Background() - task1 := &model.Task{ - ID: "1", - } + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + task1 := genDummyTask() task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, DepStatus: make(map[string]model.StatusValue), } - task3 := &model.Task{ ID: "3", Dependencies: []string{"1"}, @@ -430,10 +491,13 @@ func TestWaitingVsPending(t *testing.T) { } q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) got, _ := q.Poll(ctx, 1, filterFnTrue) + waitForProcess() info := q.Info(ctx) assert.Equal(t, 2, info.Stats.WaitingOnDeps) @@ -442,6 +506,7 @@ func TestWaitingVsPending(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, task2, got) + waitForProcess() info = q.Info(ctx) assert.Equal(t, 0, info.Stats.WaitingOnDeps) assert.Equal(t, 1, info.Stats.Pending) @@ -518,7 +583,9 @@ func TestShouldRun(t *testing.T) { } func TestFifoWithScoring(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(nil) }) + q := NewMemoryQueue(ctx) // Create tasks with different labels @@ -530,9 +597,7 @@ func TestFifoWithScoring(t *testing.T) { {ID: "5", Labels: map[string]string{"org-id": "*", "platform": "linux"}}, } - for _, task := range tasks { - assert.NoError(t, q.Push(ctx, task)) - } + assert.NoError(t, q.PushAtOnce(ctx, tasks)) // Create filter functions for different workers filters := map[int]FilterFn{