From 1144ac037fcb4fd7ec9729c4e0846c6f7fe9a2ed Mon Sep 17 00:00:00 2001 From: kim Date: Wed, 20 Aug 2025 18:14:00 +0200 Subject: [PATCH] [feature] add metrics for worker counts, and worker queue sizes (#4387) should help to debug https://codeberg.org/superseriousbusiness/gotosocial/issues/4309 Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4387 Reviewed-by: Daenney Co-authored-by: kim Co-committed-by: kim --- cmd/gotosocial/action/server/server.go | 2 +- cmd/gotosocial/action/testrig/testrig.go | 2 +- internal/observability/metrics.go | 160 +++++++++++++++++++++-- internal/transport/delivery/worker.go | 5 + internal/workers/worker_fn.go | 5 + internal/workers/worker_msg.go | 5 + 6 files changed, 169 insertions(+), 10 deletions(-) diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 85f19b9db..f206d4dcd 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -392,7 +392,7 @@ func Start(ctx context.Context) error { } // Initialize metrics. - if err := observability.InitializeMetrics(ctx, state.DB); err != nil { + if err := observability.InitializeMetrics(ctx, state); err != nil { return fmt.Errorf("error initializing metrics: %w", err) } diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go index 6399be41c..57bad155a 100644 --- a/cmd/gotosocial/action/testrig/testrig.go +++ b/cmd/gotosocial/action/testrig/testrig.go @@ -172,7 +172,7 @@ func Start(ctx context.Context) error { defer testrig.StopWorkers(state) // Initialize metrics. - if err := observability.InitializeMetrics(ctx, state.DB); err != nil { + if err := observability.InitializeMetrics(ctx, state); err != nil { return fmt.Errorf("error initializing metrics: %w", err) } diff --git a/internal/observability/metrics.go b/internal/observability/metrics.go index 09556dc77..8df5fd9fb 100644 --- a/internal/observability/metrics.go +++ b/internal/observability/metrics.go @@ -24,7 +24,7 @@ import ( "fmt" "code.superseriousbusiness.org/gotosocial/internal/config" - "code.superseriousbusiness.org/gotosocial/internal/db" + "code.superseriousbusiness.org/gotosocial/internal/state" "github.com/gin-gonic/gin" "github.com/technologize/otel-go-contrib/otelginmetrics" @@ -36,7 +36,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/exemplar" ) -func InitializeMetrics(ctx context.Context, db db.DB) error { +func InitializeMetrics(ctx context.Context, state *state.State) error { if !config.GetMetricsEnabled() { return nil } @@ -73,8 +73,8 @@ func InitializeMetrics(ctx context.Context, db db.DB) error { _, err = meter.Int64ObservableGauge( "gotosocial.instance.total_users", metric.WithDescription("Total number of users on this instance"), - metric.WithInt64Callback(func(c context.Context, o metric.Int64Observer) error { - userCount, err := db.CountInstanceUsers(c, thisInstance) + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + userCount, err := state.DB.CountInstanceUsers(ctx, thisInstance) if err != nil { return err } @@ -89,8 +89,8 @@ func InitializeMetrics(ctx context.Context, db db.DB) error { _, err = meter.Int64ObservableGauge( "gotosocial.instance.total_statuses", metric.WithDescription("Total number of statuses on this instance"), - metric.WithInt64Callback(func(c context.Context, o metric.Int64Observer) error { - statusCount, err := db.CountInstanceStatuses(c, thisInstance) + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + statusCount, err := state.DB.CountInstanceStatuses(ctx, thisInstance) if err != nil { return err } @@ -105,8 +105,8 @@ func InitializeMetrics(ctx context.Context, db db.DB) error { _, err = meter.Int64ObservableGauge( "gotosocial.instance.total_federating_instances", metric.WithDescription("Total number of other instances this instance is federating with"), - metric.WithInt64Callback(func(c context.Context, o metric.Int64Observer) error { - federatingCount, err := db.CountInstanceDomains(c, thisInstance) + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + federatingCount, err := state.DB.CountInstanceDomains(ctx, thisInstance) if err != nil { return err } @@ -118,6 +118,150 @@ func InitializeMetrics(ctx context.Context, db db.DB) error { return err } + _, err = meter.Int64ObservableGauge( + "gotosocial.workers.delivery.count", + metric.WithDescription("Current number of delivery workers"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Delivery.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "gotosocial.workers.delivery.queue", + metric.WithDescription("Current number of queued delivery worker tasks"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Delivery.Queue.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableGauge( + "gotosocial.workers.dereference.count", + metric.WithDescription("Current number of dereference workers"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Dereference.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "gotosocial.workers.dereference.queue", + metric.WithDescription("Current number of queued dereference worker tasks"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Dereference.Queue.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableGauge( + "gotosocial.workers.client_api.count", + metric.WithDescription("Current number of client API workers"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Client.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "gotosocial.workers.client_api.queue", + metric.WithDescription("Current number of queued client API worker tasks"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Client.Queue.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableGauge( + "gotosocial.workers.fedi_api.count", + metric.WithDescription("Current number of federator API workers"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Federator.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "gotosocial.workers.fedi_api.queue", + metric.WithDescription("Current number of queued federator API worker tasks"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Federator.Queue.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableGauge( + "gotosocial.workers.processing.count", + metric.WithDescription("Current number of processing workers"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Processing.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "gotosocial.workers.processing.queue", + metric.WithDescription("Current number of queued processing worker tasks"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.Processing.Queue.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableGauge( + "gotosocial.workers.webpush.count", + metric.WithDescription("Current number of webpush workers"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.WebPush.Len())) + return nil + }), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "gotosocial.workers.webpush.queue", + metric.WithDescription("Current number of queued webpush worker tasks"), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(int64(state.Workers.WebPush.Queue.Len())) + return nil + }), + ) + if err != nil { + return err + } + return nil } diff --git a/internal/transport/delivery/worker.go b/internal/transport/delivery/worker.go index 17dd8dffe..dcb656475 100644 --- a/internal/transport/delivery/worker.go +++ b/internal/transport/delivery/worker.go @@ -110,6 +110,11 @@ func (p *WorkerPool) Stop() { p.workers = p.workers[:0] } +// Len returns number of currently active workers. +func (p *WorkerPool) Len() int { + return len(p.workers) +} + // Worker wraps an httpclient.Client{} to feed // from queue.StructQueue{} for ActivityPub reqs // to deliver. It does so while prioritizing new diff --git a/internal/workers/worker_fn.go b/internal/workers/worker_fn.go index c57fcfe88..5d1b5e56b 100644 --- a/internal/workers/worker_fn.go +++ b/internal/workers/worker_fn.go @@ -85,6 +85,11 @@ func (p *FnWorkerPool) Stop() { p.workers = p.workers[:0] } +// Len returns number of currently active workers. +func (p *FnWorkerPool) Len() int { + return len(p.workers) +} + // FnWorker wraps a queue.SimpleQueue{} which // it feeds from to provide it with function // tasks to execute. It does so in a single diff --git a/internal/workers/worker_msg.go b/internal/workers/worker_msg.go index e038fda02..65b9adb82 100644 --- a/internal/workers/worker_msg.go +++ b/internal/workers/worker_msg.go @@ -96,6 +96,11 @@ func (p *MsgWorkerPool[T]) Stop() { p.workers = p.workers[:0] } +// Len returns number of currently active workers. +func (p *MsgWorkerPool[T]) Len() int { + return len(p.workers) +} + // MsgWorker wraps a processing function to // feed from a queue.StructQueue{} for messages // to process. It does so in a single goroutine