diff --git a/internal/transport/delivery/delivery.go b/internal/transport/delivery/delivery.go index 6626ca2ae..d934cd514 100644 --- a/internal/transport/delivery/delivery.go +++ b/internal/transport/delivery/delivery.go @@ -100,28 +100,53 @@ func (p *WorkerPool) Init(client *httpclient.Client) { } // Start will attempt to start 'n' Worker{}s. -func (p *WorkerPool) Start(n int) (ok bool) { - if ok = (len(p.workers) == 0); ok { - p.workers = make([]*Worker, n) - for i := range p.workers { - p.workers[i] = new(Worker) - p.workers[i].Client = p.Client - p.workers[i].Queue = &p.Queue - ok = p.workers[i].Start() && ok - } +func (p *WorkerPool) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return } + + // Allocate new workers slice. + p.workers = make([]*Worker, n) + for i := range p.workers { + + // Allocate new Worker{}. + p.workers[i] = new(Worker) + p.workers[i].Client = p.Client + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() + } + return } // Stop will attempt to stop contained Worker{}s. -func (p *WorkerPool) Stop() (ok bool) { - if ok = (len(p.workers) > 0); ok { - for i := range p.workers { - ok = p.workers[i].Stop() && ok - p.workers[i] = nil - } - p.workers = p.workers[:0] +func (p *WorkerPool) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return } + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] return } diff --git a/internal/transport/delivery/delivery_test.go b/internal/transport/delivery/delivery_test.go index 852c6f6f3..48831f098 100644 --- a/internal/transport/delivery/delivery_test.go +++ b/internal/transport/delivery/delivery_test.go @@ -32,9 +32,7 @@ func testDeliveryWorkerPool(t *testing.T, sz int, input []*testrequest) { "127.0.0.0/8", }), })) - if !wp.Start(sz) { - t.Fatal("failed starting pool") - } + wp.Start(sz) defer wp.Stop() test(t, &wp.Queue, input) } diff --git a/internal/workers/worker_fn.go b/internal/workers/worker_fn.go index 8a4c7b85c..9fa6d8e6a 100644 --- a/internal/workers/worker_fn.go +++ b/internal/workers/worker_fn.go @@ -39,27 +39,52 @@ type FnWorkerPool struct { } // Start will attempt to start 'n' FnWorker{}s. -func (p *FnWorkerPool) Start(n int) (ok bool) { - if ok = (len(p.workers) == 0); ok { - p.workers = make([]*FnWorker, n) - for i := range p.workers { - p.workers[i] = new(FnWorker) - p.workers[i].Queue = &p.Queue - ok = p.workers[i].Start() && ok - } +func (p *FnWorkerPool) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return } + + // Allocate new workers slice. + p.workers = make([]*FnWorker, n) + for i := range p.workers { + + // Allocate new FnWorker{}. + p.workers[i] = new(FnWorker) + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() + } + return } // Stop will attempt to stop contained FnWorker{}s. -func (p *FnWorkerPool) Stop() (ok bool) { - if ok = (len(p.workers) > 0); ok { - for i := range p.workers { - ok = p.workers[i].Stop() && ok - p.workers[i] = nil - } - p.workers = p.workers[:0] +func (p *FnWorkerPool) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return } + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] return } diff --git a/internal/workers/worker_msg.go b/internal/workers/worker_msg.go index b22409e91..fc418c12c 100644 --- a/internal/workers/worker_msg.go +++ b/internal/workers/worker_msg.go @@ -48,28 +48,53 @@ func (p *MsgWorkerPool[T]) Init(indices []structr.IndexConfig) { } // Start will attempt to start 'n' Worker{}s. -func (p *MsgWorkerPool[T]) Start(n int) (ok bool) { - if ok = (len(p.workers) == 0); ok { - p.workers = make([]*MsgWorker[T], n) - for i := range p.workers { - p.workers[i] = new(MsgWorker[T]) - p.workers[i].Process = p.Process - p.workers[i].Queue = &p.Queue - ok = p.workers[i].Start() && ok - } +func (p *MsgWorkerPool[T]) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return } + + // Allocate new msg workers slice. + p.workers = make([]*MsgWorker[T], n) + for i := range p.workers { + + // Allocate new MsgWorker[T]{}. + p.workers[i] = new(MsgWorker[T]) + p.workers[i].Process = p.Process + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() + } + return } // Stop will attempt to stop contained Worker{}s. -func (p *MsgWorkerPool[T]) Stop() (ok bool) { - if ok = (len(p.workers) > 0); ok { - for i := range p.workers { - ok = p.workers[i].Stop() && ok - p.workers[i] = nil - } - p.workers = p.workers[:0] +func (p *MsgWorkerPool[T]) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return } + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] return } diff --git a/internal/workers/workers.go b/internal/workers/workers.go index c9e2db9a1..c8606ef9f 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -59,48 +59,27 @@ type Workers struct { // StartScheduler starts the job scheduler. func (w *Workers) StartScheduler() { - tryUntil("starting scheduler", 5, w.Scheduler.Start) + _ = w.Scheduler.Start() // false = already running } // Start will start contained worker pools. func (w *Workers) Start() { - // Get currently set GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - - tryUntil("start delivery workerpool", 5, func() bool { - n := config.GetAdvancedSenderMultiplier() - if n < 1 { - // clamp min senders to 1. - return w.Delivery.Start(1) - } - return w.Delivery.Start(n * maxprocs) - }) - - tryUntil("starting client workerpool", 5, func() bool { - return w.Client.Start(4 * maxprocs) - }) - - tryUntil("starting federator workerpool", 5, func() bool { - return w.Federator.Start(4 * maxprocs) - }) - - tryUntil("starting dereference workerpool", 5, func() bool { - return w.Dereference.Start(4 * maxprocs) - }) - - tryUntil("starting media workerpool", 5, func() bool { - return w.Media.Start(8 * maxprocs) - }) + w.Delivery.Start(deliveryWorkers(maxprocs)) + w.Client.Start(4 * maxprocs) + w.Federator.Start(4 * maxprocs) + w.Dereference.Start(4 * maxprocs) + w.Media.Start(8 * maxprocs) } // Stop will stop all of the contained worker pools (and global scheduler). func (w *Workers) Stop() { - tryUntil("stopping scheduler", 5, w.Scheduler.Stop) - tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop) - tryUntil("stopping client API workerpool", 5, w.Client.Stop) - tryUntil("stopping federator workerpool", 5, w.Federator.Stop) - tryUntil("stopping dereference workerpool", 5, w.Dereference.Stop) - tryUntil("stopping media workerpool", 5, w.Media.Stop) + _ = w.Scheduler.Stop() // false = not running + w.Delivery.Stop() + w.Client.Stop() + w.Federator.Stop() + w.Dereference.Stop() + w.Media.Stop() } // nocopy when embedded will signal linter to @@ -111,6 +90,15 @@ func (*nocopy) Lock() {} func (*nocopy) Unlock() {} +func deliveryWorkers(maxprocs int) int { + n := config.GetAdvancedSenderMultiplier() + if n < 1 { + // clamp to 1 + return 1 + } + return n * maxprocs +} + // tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'. func tryUntil(msg string, count int, do func() bool) { for i := 0; i < count; i++ { diff --git a/testrig/util.go b/testrig/util.go index ad4ffcb6c..d5eaedcd5 100644 --- a/testrig/util.go +++ b/testrig/util.go @@ -79,18 +79,18 @@ func StartWorkers(state *state.State, processor *workers.Processor) { state.Workers.Delivery.Init(nil) _ = state.Workers.Scheduler.Start() - _ = state.Workers.Client.Start(1) - _ = state.Workers.Federator.Start(1) - _ = state.Workers.Dereference.Start(1) - _ = state.Workers.Media.Start(1) + state.Workers.Client.Start(1) + state.Workers.Federator.Start(1) + state.Workers.Dereference.Start(1) + state.Workers.Media.Start(1) } func StopWorkers(state *state.State) { _ = state.Workers.Scheduler.Stop() - _ = state.Workers.Client.Stop() - _ = state.Workers.Federator.Stop() - _ = state.Workers.Dereference.Stop() - _ = state.Workers.Media.Stop() + state.Workers.Client.Stop() + state.Workers.Federator.Stop() + state.Workers.Dereference.Stop() + state.Workers.Media.Stop() } func StartTimelines(state *state.State, filter *visibility.Filter, converter *typeutils.Converter) {