start the job scheduler separately to the worker pools

This commit is contained in:
kim 2024-04-24 13:39:10 +01:00
parent 092c9819b2
commit f94c201e94
2 changed files with 18 additions and 5 deletions

View file

@ -198,6 +198,10 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error starting list timeline: %s", err)
}
// Start the job scheduler
// (this is required for cleaner).
state.Workers.StartScheduler()
// Create a media cleaner
// using the given state.
cleaner := cleaner.New(&state)
@ -214,10 +218,17 @@ var Start action.GTSAction = func(ctx context.Context) error {
emailSender,
)
// Set state client / federator synchronous processing functions.
// Initialize the specialized workers.
state.Workers.Client.Init(messages.ClientMsgIndices())
state.Workers.Federator.Init(messages.FederatorMsgIndices())
state.Workers.Delivery.Init(client)
state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI
state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI
// Initialize workers.
state.Workers.Start()
defer state.Workers.Stop()
// Schedule tasks for all existing poll expiries.
if err := processor.Polls().ScheduleAll(ctx); err != nil {
return fmt.Errorf("error scheduling poll expiries: %w", err)

View file

@ -57,14 +57,16 @@ type Workers struct {
_ nocopy
}
// Start will start all of the contained
// worker pools (and global scheduler).
// StartScheduler starts the job scheduler.
func (w *Workers) StartScheduler() {
tryUntil("starting scheduler", 5, w.Scheduler.Start)
}
// Start will start contained worker pools.
func (w *Workers) Start() {
// Get currently set GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0)
tryUntil("starting scheduler", 5, w.Scheduler.Start)
tryUntil("start delivery workerpool", 5, func() bool {
n := config.GetAdvancedSenderMultiplier()
if n < 1 {