slightly tweak deliveryworkerpool API, use advanced sender multiplier

This commit is contained in:
kim 2024-04-04 12:09:03 +01:00
parent f70ef72f7b
commit 7080930371
5 changed files with 40 additions and 40 deletions

View file

@ -24,7 +24,6 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"runtime"
"syscall" "syscall"
"time" "time"
@ -127,8 +126,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
// Initialize the queues. // Initialize the queues.
state.Queues.Init() state.Queues.Init()
state.Workers.APDelivery.Init(client, &state.Queues.APRequests)
state.Workers.APDelivery.Init(client, &state.Queues.APRequests, runtime.GOMAXPROCS(0))
// Initialize workers. // Initialize workers.
state.Workers.Start() state.Workers.Start()

View file

@ -114,9 +114,7 @@ type Client struct {
// New returns a new instance of Client initialized using configuration. // New returns a new instance of Client initialized using configuration.
func New(cfg Config) *Client { func New(cfg Config) *Client {
var c Client var c Client
c.retries = 5
// For now use const.
c.retries = maxRetries
d := &net.Dialer{ d := &net.Dialer{
Timeout: 15 * time.Second, Timeout: 15 * time.Second,
@ -285,7 +283,7 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) {
return nil, false, err return nil, false, err
} }
if errstr := err.Error(); // nocollapse if errstr := err.Error(); //
strings.Contains(errstr, "stopped after 10 redirects") || strings.Contains(errstr, "stopped after 10 redirects") ||
strings.Contains(errstr, "tls: ") || strings.Contains(errstr, "tls: ") ||
strings.Contains(errstr, "x509: ") { strings.Contains(errstr, "x509: ") {
@ -295,7 +293,7 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) {
return nil, false, err return nil, false, err
} }
if dnserr := errorsv2.AsV2[*net.DNSError](err); // nocollapse if dnserr := errorsv2.AsV2[*net.DNSError](err); //
dnserr != nil && dnserr.IsNotFound { dnserr != nil && dnserr.IsNotFound {
// DNS lookup failure, this domain does not exist // DNS lookup failure, this domain does not exist
return nil, false, gtserror.SetNotFound(err) return nil, false, gtserror.SetNotFound(err)
@ -326,7 +324,8 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) {
} }
// Don't let their provided backoff exceed our max. // Don't let their provided backoff exceed our max.
if max := baseBackoff * maxRetries; r.backoff > max { if max := baseBackoff * time.Duration(c.retries); //
r.backoff > max {
r.backoff = max r.backoff = max
} }
} }

View file

@ -20,6 +20,7 @@ package httpclient
import ( import (
"context" "context"
"slices" "slices"
"sync"
"time" "time"
"codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-runners"
@ -31,7 +32,10 @@ import (
// APDeliveryWorkerPool wraps APDeliveryWorker{}s // APDeliveryWorkerPool wraps APDeliveryWorker{}s
// in a singular struct for easy multi start/stop. // in a singular struct for easy multi start/stop.
type APDeliveryWorkerPool struct { type APDeliveryWorkerPool struct {
client *Client
queue *queue.StructQueue[*queue.APRequest]
workers []APDeliveryWorker workers []APDeliveryWorker
mutex sync.Mutex
} }
// Init will initialize the DeliveryWorker{} pool // Init will initialize the DeliveryWorker{} pool
@ -40,41 +44,39 @@ type APDeliveryWorkerPool struct {
func (p *APDeliveryWorkerPool) Init( func (p *APDeliveryWorkerPool) Init(
client *Client, client *Client,
queue *queue.StructQueue[*queue.APRequest], queue *queue.StructQueue[*queue.APRequest],
workers int,
) { ) {
p.workers = make([]APDeliveryWorker, workers) p.mutex.Lock()
for i := range p.workers { p.client = client
p.workers[i] = NewAPDeliveryWorker( p.queue = queue
client, p.mutex.Unlock()
queue,
)
}
} }
// Start will attempt to start all of the contained DeliveryWorker{}s. // Start will attempt to start 'n' DeliveryWorker{}s.
// NOTE: this is not safe to call concurrently with .Init(). func (p *APDeliveryWorkerPool) Start(n int) (ok bool) {
func (p *APDeliveryWorkerPool) Start() bool { p.mutex.Lock()
if len(p.workers) == 0 { if ok = (len(p.workers) == 0); ok {
return false p.workers = make([]APDeliveryWorker, n)
for i := range p.workers {
p.workers[i].client = p.client
p.workers[i].queue = p.queue
ok = p.workers[i].Start() && ok
}
} }
ok := true p.mutex.Unlock()
for i := range p.workers { return
ok = p.workers[i].Start() && ok
}
return ok
} }
// Stop will attempt to stop all of the contained DeliveryWorker{}s. // Stop will attempt to stop all of the contained DeliveryWorker{}s.
// NOTE: this is not safe to call concurrently with .Init(). func (p *APDeliveryWorkerPool) Stop() (ok bool) {
func (p *APDeliveryWorkerPool) Stop() bool { p.mutex.Lock()
if len(p.workers) == 0 { if ok = (len(p.workers) > 0); ok {
return false for i := range p.workers {
ok = p.workers[i].Stop() && ok
}
p.workers = p.workers[:0]
} }
ok := true p.mutex.Unlock()
for i := range p.workers { return
ok = p.workers[i].Stop() && ok
}
return ok
} }
// APDeliveryWorker wraps a Client{} to feed from // APDeliveryWorker wraps a Client{} to feed from

View file

@ -25,9 +25,6 @@ import (
) )
const ( const (
// max no. attempts.
maxRetries = 5
// starting backoff duration. // starting backoff duration.
baseBackoff = 2 * time.Second baseBackoff = 2 * time.Second
) )

View file

@ -23,6 +23,7 @@ import (
"runtime" "runtime"
"codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/scheduler" "github.com/superseriousbusiness/gotosocial/internal/scheduler"
@ -76,7 +77,10 @@ func (w *Workers) Start() {
tryUntil("starting scheduler", 5, w.Scheduler.Start) tryUntil("starting scheduler", 5, w.Scheduler.Start)
tryUntil("start ap delivery workerpool", 5, w.APDelivery.Start) tryUntil("start ap delivery workerpool", 5, func() bool {
n := config.GetAdvancedSenderMultiplier()
return w.APDelivery.Start(n * maxprocs)
})
tryUntil("starting client API workerpool", 5, func() bool { tryUntil("starting client API workerpool", 5, func() bool {
return w.ClientAPI.Start(4*maxprocs, 400*maxprocs) return w.ClientAPI.Start(4*maxprocs, 400*maxprocs)