diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index e3801ef2b2..8c9dd274f0 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -146,8 +146,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { log.Debug("Queue %q starts new worker", q.GetName()) defer log.Debug("Queue %q stops idle worker", q.GetName()) - atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging - t := time.NewTicker(workerIdleDuration) defer t.Stop() diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index 4160622d81..b28fd88027 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct { workerMaxNum int workerActiveNum int workerNumMu sync.Mutex - - workerStartedCounter int32 } type flushType chan struct{} diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go index d18c795ebc..9898ceb873 100644 --- a/modules/queue/workerqueue_test.go +++ b/modules/queue/workerqueue_test.go @@ -4,7 +4,9 @@ package queue import ( + "bytes" "context" + "runtime" "strconv" "sync" "testing" @@ -14,6 +16,7 @@ import ( "code.gitea.io/gitea/modules/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() { @@ -249,23 +252,40 @@ func TestWorkerPoolQueueShutdown(t *testing.T) { } func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { - defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)() + defer test.MockVariableValue(&workerIdleDuration, 1*time.Millisecond)() + chGoroutineIDs := make(chan string) handler := func(items ...int) (unhandled []int) { - time.Sleep(50 * time.Millisecond) + time.Sleep(10 * workerIdleDuration) + chGoroutineIDs <- goroutineID() // hacky way to identify a worker return nil } q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) stop := runWorkerPoolQueue(q) - for i := 0; i < 20; i++ { + + const workloadSize = 12 + for i := 0; i < workloadSize; i++ { assert.NoError(t, q.Push(i)) } - time.Sleep(500 * time.Millisecond) - assert.EqualValues(t, 2, q.GetWorkerNumber()) - assert.EqualValues(t, 2, q.GetWorkerActiveNumber()) - // when the queue never becomes empty, the existing workers should keep working - assert.LessOrEqual(t, q.workerStartedCounter, int32(4)) // counter should be 2, but sometimes it gets bigger + workerIDs := make(map[string]struct{}) + for i := 0; i < workloadSize; i++ { + c := <-chGoroutineIDs + workerIDs[c] = struct{}{} + t.Logf("%d workers: overall=%d current=%d", i, len(workerIDs), q.GetWorkerNumber()) + + // ensure that no more than qs.MaxWorkers workers are created over the whole lifetime of the queue + // (otherwise it would mean that some workers got shut down while the queue was full) + require.LessOrEqual(t, len(workerIDs), q.GetWorkerMaxNumber()) + } + close(chGoroutineIDs) + stop() } + +func goroutineID() string { + var buffer [31]byte + _ = runtime.Stack(buffer[:], false) + return string(bytes.Fields(buffer[10:])[0]) +}