remove boolean result from worker start / stop since false = already running or already stopped

This commit is contained in:
kim 2024-04-26 13:09:19 +01:00
parent 1733a0d422
commit e1b6dd4434
6 changed files with 152 additions and 91 deletions

View file

@ -100,28 +100,53 @@ func (p *WorkerPool) Init(client *httpclient.Client) {
} }
// Start will attempt to start 'n' Worker{}s. // Start will attempt to start 'n' Worker{}s.
func (p *WorkerPool) Start(n int) (ok bool) { func (p *WorkerPool) Start(n int) {
if ok = (len(p.workers) == 0); ok { // Check whether workers are
p.workers = make([]*Worker, n) // set (is already running).
for i := range p.workers { ok := (len(p.workers) > 0)
p.workers[i] = new(Worker) if ok {
p.workers[i].Client = p.Client return
p.workers[i].Queue = &p.Queue
ok = p.workers[i].Start() && ok
}
} }
// Allocate new workers slice.
p.workers = make([]*Worker, n)
for i := range p.workers {
// Allocate new Worker{}.
p.workers[i] = new(Worker)
p.workers[i].Client = p.Client
p.workers[i].Queue = &p.Queue
// Attempt to start worker.
// Return bool not useful
// here, as true = started,
// false = already running.
_ = p.workers[i].Start()
}
return return
} }
// Stop will attempt to stop contained Worker{}s. // Stop will attempt to stop contained Worker{}s.
func (p *WorkerPool) Stop() (ok bool) { func (p *WorkerPool) Stop() {
if ok = (len(p.workers) > 0); ok { // Check whether workers are
for i := range p.workers { // set (is currently running).
ok = p.workers[i].Stop() && ok ok := (len(p.workers) == 0)
p.workers[i] = nil if ok {
} return
p.workers = p.workers[:0]
} }
// Stop all running workers.
for i := range p.workers {
// return bool not useful
// here, as true = stopped,
// false = never running.
_ = p.workers[i].Stop()
}
// Unset workers slice.
p.workers = p.workers[:0]
return return
} }

View file

@ -32,9 +32,7 @@ func testDeliveryWorkerPool(t *testing.T, sz int, input []*testrequest) {
"127.0.0.0/8", "127.0.0.0/8",
}), }),
})) }))
if !wp.Start(sz) { wp.Start(sz)
t.Fatal("failed starting pool")
}
defer wp.Stop() defer wp.Stop()
test(t, &wp.Queue, input) test(t, &wp.Queue, input)
} }

View file

@ -39,27 +39,52 @@ type FnWorkerPool struct {
} }
// Start will attempt to start 'n' FnWorker{}s. // Start will attempt to start 'n' FnWorker{}s.
func (p *FnWorkerPool) Start(n int) (ok bool) { func (p *FnWorkerPool) Start(n int) {
if ok = (len(p.workers) == 0); ok { // Check whether workers are
p.workers = make([]*FnWorker, n) // set (is already running).
for i := range p.workers { ok := (len(p.workers) > 0)
p.workers[i] = new(FnWorker) if ok {
p.workers[i].Queue = &p.Queue return
ok = p.workers[i].Start() && ok
}
} }
// Allocate new workers slice.
p.workers = make([]*FnWorker, n)
for i := range p.workers {
// Allocate new FnWorker{}.
p.workers[i] = new(FnWorker)
p.workers[i].Queue = &p.Queue
// Attempt to start worker.
// Return bool not useful
// here, as true = started,
// false = already running.
_ = p.workers[i].Start()
}
return return
} }
// Stop will attempt to stop contained FnWorker{}s. // Stop will attempt to stop contained FnWorker{}s.
func (p *FnWorkerPool) Stop() (ok bool) { func (p *FnWorkerPool) Stop() {
if ok = (len(p.workers) > 0); ok { // Check whether workers are
for i := range p.workers { // set (is currently running).
ok = p.workers[i].Stop() && ok ok := (len(p.workers) == 0)
p.workers[i] = nil if ok {
} return
p.workers = p.workers[:0]
} }
// Stop all running workers.
for i := range p.workers {
// return bool not useful
// here, as true = stopped,
// false = never running.
_ = p.workers[i].Stop()
}
// Unset workers slice.
p.workers = p.workers[:0]
return return
} }

View file

@ -48,28 +48,53 @@ func (p *MsgWorkerPool[T]) Init(indices []structr.IndexConfig) {
} }
// Start will attempt to start 'n' Worker{}s. // Start will attempt to start 'n' Worker{}s.
func (p *MsgWorkerPool[T]) Start(n int) (ok bool) { func (p *MsgWorkerPool[T]) Start(n int) {
if ok = (len(p.workers) == 0); ok { // Check whether workers are
p.workers = make([]*MsgWorker[T], n) // set (is already running).
for i := range p.workers { ok := (len(p.workers) > 0)
p.workers[i] = new(MsgWorker[T]) if ok {
p.workers[i].Process = p.Process return
p.workers[i].Queue = &p.Queue
ok = p.workers[i].Start() && ok
}
} }
// Allocate new msg workers slice.
p.workers = make([]*MsgWorker[T], n)
for i := range p.workers {
// Allocate new MsgWorker[T]{}.
p.workers[i] = new(MsgWorker[T])
p.workers[i].Process = p.Process
p.workers[i].Queue = &p.Queue
// Attempt to start worker.
// Return bool not useful
// here, as true = started,
// false = already running.
_ = p.workers[i].Start()
}
return return
} }
// Stop will attempt to stop contained Worker{}s. // Stop will attempt to stop contained Worker{}s.
func (p *MsgWorkerPool[T]) Stop() (ok bool) { func (p *MsgWorkerPool[T]) Stop() {
if ok = (len(p.workers) > 0); ok { // Check whether workers are
for i := range p.workers { // set (is currently running).
ok = p.workers[i].Stop() && ok ok := (len(p.workers) == 0)
p.workers[i] = nil if ok {
} return
p.workers = p.workers[:0]
} }
// Stop all running workers.
for i := range p.workers {
// return bool not useful
// here, as true = stopped,
// false = never running.
_ = p.workers[i].Stop()
}
// Unset workers slice.
p.workers = p.workers[:0]
return return
} }

View file

@ -59,48 +59,27 @@ type Workers struct {
// StartScheduler starts the job scheduler. // StartScheduler starts the job scheduler.
func (w *Workers) StartScheduler() { func (w *Workers) StartScheduler() {
tryUntil("starting scheduler", 5, w.Scheduler.Start) _ = w.Scheduler.Start() // false = already running
} }
// Start will start contained worker pools. // Start will start contained worker pools.
func (w *Workers) Start() { func (w *Workers) Start() {
// Get currently set GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0) maxprocs := runtime.GOMAXPROCS(0)
w.Delivery.Start(deliveryWorkers(maxprocs))
tryUntil("start delivery workerpool", 5, func() bool { w.Client.Start(4 * maxprocs)
n := config.GetAdvancedSenderMultiplier() w.Federator.Start(4 * maxprocs)
if n < 1 { w.Dereference.Start(4 * maxprocs)
// clamp min senders to 1. w.Media.Start(8 * maxprocs)
return w.Delivery.Start(1)
}
return w.Delivery.Start(n * maxprocs)
})
tryUntil("starting client workerpool", 5, func() bool {
return w.Client.Start(4 * maxprocs)
})
tryUntil("starting federator workerpool", 5, func() bool {
return w.Federator.Start(4 * maxprocs)
})
tryUntil("starting dereference workerpool", 5, func() bool {
return w.Dereference.Start(4 * maxprocs)
})
tryUntil("starting media workerpool", 5, func() bool {
return w.Media.Start(8 * maxprocs)
})
} }
// Stop will stop all of the contained worker pools (and global scheduler). // Stop will stop all of the contained worker pools (and global scheduler).
func (w *Workers) Stop() { func (w *Workers) Stop() {
tryUntil("stopping scheduler", 5, w.Scheduler.Stop) _ = w.Scheduler.Stop() // false = not running
tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop) w.Delivery.Stop()
tryUntil("stopping client API workerpool", 5, w.Client.Stop) w.Client.Stop()
tryUntil("stopping federator workerpool", 5, w.Federator.Stop) w.Federator.Stop()
tryUntil("stopping dereference workerpool", 5, w.Dereference.Stop) w.Dereference.Stop()
tryUntil("stopping media workerpool", 5, w.Media.Stop) w.Media.Stop()
} }
// nocopy when embedded will signal linter to // nocopy when embedded will signal linter to
@ -111,6 +90,15 @@ func (*nocopy) Lock() {}
func (*nocopy) Unlock() {} func (*nocopy) Unlock() {}
func deliveryWorkers(maxprocs int) int {
n := config.GetAdvancedSenderMultiplier()
if n < 1 {
// clamp to 1
return 1
}
return n * maxprocs
}
// tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'. // tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'.
func tryUntil(msg string, count int, do func() bool) { func tryUntil(msg string, count int, do func() bool) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {

View file

@ -79,18 +79,18 @@ func StartWorkers(state *state.State, processor *workers.Processor) {
state.Workers.Delivery.Init(nil) state.Workers.Delivery.Init(nil)
_ = state.Workers.Scheduler.Start() _ = state.Workers.Scheduler.Start()
_ = state.Workers.Client.Start(1) state.Workers.Client.Start(1)
_ = state.Workers.Federator.Start(1) state.Workers.Federator.Start(1)
_ = state.Workers.Dereference.Start(1) state.Workers.Dereference.Start(1)
_ = state.Workers.Media.Start(1) state.Workers.Media.Start(1)
} }
func StopWorkers(state *state.State) { func StopWorkers(state *state.State) {
_ = state.Workers.Scheduler.Stop() _ = state.Workers.Scheduler.Stop()
_ = state.Workers.Client.Stop() state.Workers.Client.Stop()
_ = state.Workers.Federator.Stop() state.Workers.Federator.Stop()
_ = state.Workers.Dereference.Stop() state.Workers.Dereference.Stop()
_ = state.Workers.Media.Stop() state.Workers.Media.Stop()
} }
func StartTimelines(state *state.State, filter *visibility.Filter, converter *typeutils.Converter) { func StartTimelines(state *state.State, filter *visibility.Filter, converter *typeutils.Converter) {