From 7080930371630e5ac25dbb3cdf2031c5525828c9 Mon Sep 17 00:00:00 2001 From: kim Date: Thu, 4 Apr 2024 12:09:03 +0100 Subject: [PATCH] slightly tweak deliveryworkerpool API, use advanced sender multiplier --- cmd/gotosocial/action/server/server.go | 4 +- internal/httpclient/client.go | 11 +++-- internal/httpclient/delivery.go | 56 +++++++++++++------------- internal/httpclient/request.go | 3 -- internal/workers/workers.go | 6 ++- 5 files changed, 40 insertions(+), 40 deletions(-) diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 6f486b71a..e0b7e0113 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -24,7 +24,6 @@ import ( "net/http" "os" "os/signal" - "runtime" "syscall" "time" @@ -127,8 +126,7 @@ var Start action.GTSAction = func(ctx context.Context) error { // Initialize the queues. state.Queues.Init() - - state.Workers.APDelivery.Init(client, &state.Queues.APRequests, runtime.GOMAXPROCS(0)) + state.Workers.APDelivery.Init(client, &state.Queues.APRequests) // Initialize workers. state.Workers.Start() diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 7f7727ae8..afe78b897 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -114,9 +114,7 @@ type Client struct { // New returns a new instance of Client initialized using configuration. func New(cfg Config) *Client { var c Client - - // For now use const. - c.retries = maxRetries + c.retries = 5 d := &net.Dialer{ Timeout: 15 * time.Second, @@ -285,7 +283,7 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) { return nil, false, err } - if errstr := err.Error(); // nocollapse + if errstr := err.Error(); // strings.Contains(errstr, "stopped after 10 redirects") || strings.Contains(errstr, "tls: ") || strings.Contains(errstr, "x509: ") { @@ -295,7 +293,7 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) { return nil, false, err } - if dnserr := errorsv2.AsV2[*net.DNSError](err); // nocollapse + if dnserr := errorsv2.AsV2[*net.DNSError](err); // dnserr != nil && dnserr.IsNotFound { // DNS lookup failure, this domain does not exist return nil, false, gtserror.SetNotFound(err) @@ -326,7 +324,8 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) { } // Don't let their provided backoff exceed our max. - if max := baseBackoff * maxRetries; r.backoff > max { + if max := baseBackoff * time.Duration(c.retries); // + r.backoff > max { r.backoff = max } } diff --git a/internal/httpclient/delivery.go b/internal/httpclient/delivery.go index beeb9f476..4345da55e 100644 --- a/internal/httpclient/delivery.go +++ b/internal/httpclient/delivery.go @@ -20,6 +20,7 @@ package httpclient import ( "context" "slices" + "sync" "time" "codeberg.org/gruf/go-runners" @@ -31,7 +32,10 @@ import ( // APDeliveryWorkerPool wraps APDeliveryWorker{}s // in a singular struct for easy multi start/stop. type APDeliveryWorkerPool struct { + client *Client + queue *queue.StructQueue[*queue.APRequest] workers []APDeliveryWorker + mutex sync.Mutex } // Init will initialize the DeliveryWorker{} pool @@ -40,41 +44,39 @@ type APDeliveryWorkerPool struct { func (p *APDeliveryWorkerPool) Init( client *Client, queue *queue.StructQueue[*queue.APRequest], - workers int, ) { - p.workers = make([]APDeliveryWorker, workers) - for i := range p.workers { - p.workers[i] = NewAPDeliveryWorker( - client, - queue, - ) - } + p.mutex.Lock() + p.client = client + p.queue = queue + p.mutex.Unlock() } -// Start will attempt to start all of the contained DeliveryWorker{}s. -// NOTE: this is not safe to call concurrently with .Init(). -func (p *APDeliveryWorkerPool) Start() bool { - if len(p.workers) == 0 { - return false +// Start will attempt to start 'n' DeliveryWorker{}s. +func (p *APDeliveryWorkerPool) Start(n int) (ok bool) { + p.mutex.Lock() + if ok = (len(p.workers) == 0); ok { + p.workers = make([]APDeliveryWorker, n) + for i := range p.workers { + p.workers[i].client = p.client + p.workers[i].queue = p.queue + ok = p.workers[i].Start() && ok + } } - ok := true - for i := range p.workers { - ok = p.workers[i].Start() && ok - } - return ok + p.mutex.Unlock() + return } // Stop will attempt to stop all of the contained DeliveryWorker{}s. -// NOTE: this is not safe to call concurrently with .Init(). -func (p *APDeliveryWorkerPool) Stop() bool { - if len(p.workers) == 0 { - return false +func (p *APDeliveryWorkerPool) Stop() (ok bool) { + p.mutex.Lock() + if ok = (len(p.workers) > 0); ok { + for i := range p.workers { + ok = p.workers[i].Stop() && ok + } + p.workers = p.workers[:0] } - ok := true - for i := range p.workers { - ok = p.workers[i].Stop() && ok - } - return ok + p.mutex.Unlock() + return } // APDeliveryWorker wraps a Client{} to feed from diff --git a/internal/httpclient/request.go b/internal/httpclient/request.go index 256d97943..1c3d8d0d3 100644 --- a/internal/httpclient/request.go +++ b/internal/httpclient/request.go @@ -25,9 +25,6 @@ import ( ) const ( - // max no. attempts. - maxRetries = 5 - // starting backoff duration. baseBackoff = 2 * time.Second ) diff --git a/internal/workers/workers.go b/internal/workers/workers.go index b6906857e..e94e99f79 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -23,6 +23,7 @@ import ( "runtime" "codeberg.org/gruf/go-runners" + "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/scheduler" @@ -76,7 +77,10 @@ func (w *Workers) Start() { tryUntil("starting scheduler", 5, w.Scheduler.Start) - tryUntil("start ap delivery workerpool", 5, w.APDelivery.Start) + tryUntil("start ap delivery workerpool", 5, func() bool { + n := config.GetAdvancedSenderMultiplier() + return w.APDelivery.Start(n * maxprocs) + }) tryUntil("starting client API workerpool", 5, func() bool { return w.ClientAPI.Start(4*maxprocs, 400*maxprocs)