shuffle things around to move delivery stuff into transport/ subpkg

This commit is contained in:
kim 2024-04-05 14:40:18 +01:00
parent ba60397e30
commit a3bd01ddb8
12 changed files with 430 additions and 533 deletions

View file

@ -124,9 +124,8 @@ var Start action.GTSAction = func(ctx context.Context) error {
TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(),
})
// Initialize the queues.
state.Queues.Init()
state.Workers.APDelivery.Init(client, &state.Queues.APRequests)
// Initialize delivery worker with http client.
state.Workers.Delivery.Init(client)
// Initialize workers.
state.Workers.Start()

View file

@ -195,82 +195,114 @@ func (c *Client) Do(r *http.Request) (rsp *http.Response, err error) {
return nil, err
}
// Get request hostname.
host := r.URL.Hostname()
// Check whether request should fast fail.
fastFail := gtscontext.IsFastfail(r.Context())
if !fastFail {
// Check if recently reached max retries for this host
// so we don't bother with a retry-backoff loop. The only
// errors that are retried upon are server failure, TLS
// and domain resolution type errors, so this cached result
// indicates this server is likely having issues.
fastFail = c.badHosts.Has(host)
defer func() {
if err != nil {
// On error mark as a bad-host.
c.badHosts.Set(host, struct{}{})
}
}()
}
// Prepare log entry.
log := requestLog(r)
// Wrap in our own request
// type for retry-backoff.
req := wrapRequest(r)
req := WrapRequest(r)
for req.attempts < c.retries {
var retry bool
log.Info("performing request")
// Perform the http request.
rsp, retry, err = c.do(&req)
if err == nil || !retry {
return
}
log.Error(err)
if fastFail {
// on fast-fail, don't bother backoff/retry
if gtscontext.IsFastfail(r.Context()) {
// If the fast-fail flag was set, just
// attempt a single iteration instead of
// following the below retry-backoff loop.
rsp, _, err = c.DoOnce(&req)
if err != nil {
return nil, fmt.Errorf("%w (fast fail)", err)
}
// Start the backoff timer channel.
backoff, cncl := sleepch(req.BackOff())
select {
// Request ctx cancelled
case <-r.Context().Done():
cncl()
// Backoff for a time
case <-backoff:
}
return rsp, nil
}
// Set error return to trigger setting "bad host".
err = errors.New("transport reached max retries")
return
for {
var retry bool
// Perform the http request.
rsp, retry, err = c.DoOnce(&req)
if err == nil {
return rsp, nil
}
if !retry {
// reached max retries, don't further backoff
return nil, fmt.Errorf("%w (max retries)", err)
}
// Start new backoff sleep timer.
backoff := time.NewTimer(req.BackOff())
select {
// Request ctx cancelled.
case <-r.Context().Done():
backoff.Stop()
// Return context error.
err = r.Context().Err()
return nil, err
// Backoff for time.
case <-backoff.C:
}
}
}
// do wraps an underlying http.Client{}.Do() to perform our wrapped request type:
// DoOnce wraps an underlying http.Client{}.Do() to perform our wrapped request type:
// rewinding response body to permit reuse, signing request data when SignFunc provided,
// safely limiting response body, updating retry attempt counts and setting retry-after.
func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) {
// Update the
// marking erroring hosts, updating retry attempt counts and setting backoff from header.
func (c *Client) DoOnce(r *Request) (rsp *http.Response, retry bool, err error) {
if r.attempts > c.retries {
// Ensure request hasn't reached max number of attempts.
err = fmt.Errorf("httpclient: reached max retries (%d)", c.retries)
return
}
// Update no.
// attempts.
r.attempts++
// Reset backoff.
r.backoff = 0
// Perform main routine.
rsp, retry, err = c.do(r)
if rsp != nil {
// Log successful rsp.
r.log.Info(rsp.Status)
return
}
// Log any errors.
r.log.Error(err)
switch {
case !retry:
// If they were told not to
// retry, also set number of
// attempts to prevent retry.
r.attempts = c.retries + 1
case r.attempts > c.retries:
// On max retries, mark this as
// a "badhost", i.e. is erroring.
c.badHosts.Set(r.Host, struct{}{})
// Ensure retry flag is unset
// when reached max attempts.
retry = false
case c.badHosts.Has(r.Host):
// When retry is still permitted,
// check host hasn't been marked
// as a "badhost", i.e. erroring.
r.attempts = c.retries + 1
retry = false
}
return
}
// do performs the "meat" of DoOnce(), but it's separated out to allow
// easier wrapping of the response, retry, error returns with further logic.
func (c *Client) do(r *Request) (rsp *http.Response, retry bool, err error) {
// Perform the HTTP request.
rsp, err := c.client.Do(r.req)
rsp, err = c.client.Do(r.Request)
if err != nil {
if errorsv2.IsV2(err,
@ -299,6 +331,7 @@ func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) {
return nil, false, gtserror.SetNotFound(err)
}
// A retryable error.
return nil, true, err
} else if rsp.StatusCode > 500 ||

View file

@ -1,291 +0,0 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package httpclient
import (
"context"
"slices"
"sync"
"time"
"codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/queue"
)
// APDeliveryWorkerPool wraps APDeliveryWorker{}s
// in a singular struct for easy multi start/stop.
type APDeliveryWorkerPool struct {
client *Client
queue *queue.StructQueue[*queue.APRequest]
workers []APDeliveryWorker
mutex sync.Mutex
}
// Init will initialize the DeliveryWorker{} pool
// with given http client, request queue to pull
// from and number of delivery workers to spawn.
func (p *APDeliveryWorkerPool) Init(
client *Client,
queue *queue.StructQueue[*queue.APRequest],
) {
p.mutex.Lock()
p.client = client
p.queue = queue
p.mutex.Unlock()
}
// Start will attempt to start 'n' DeliveryWorker{}s.
func (p *APDeliveryWorkerPool) Start(n int) (ok bool) {
p.mutex.Lock()
if ok = (len(p.workers) == 0); ok {
p.workers = make([]APDeliveryWorker, n)
for i := range p.workers {
p.workers[i].client = p.client
p.workers[i].queue = p.queue
ok = p.workers[i].Start() && ok
}
}
p.mutex.Unlock()
return
}
// Stop will attempt to stop all of the contained DeliveryWorker{}s.
func (p *APDeliveryWorkerPool) Stop() (ok bool) {
p.mutex.Lock()
if ok = (len(p.workers) > 0); ok {
for i := range p.workers {
ok = p.workers[i].Stop() && ok
}
p.workers = p.workers[:0]
}
p.mutex.Unlock()
return
}
// APDeliveryWorker wraps a Client{} to feed from
// a queue.StructQueue{} for ActivityPub requests
// to deliver. It does so while prioritizing new
// queued requests over backlogged retries.
type APDeliveryWorker struct {
client *Client
queue *queue.StructQueue[*queue.APRequest]
backlog []*delivery
service runners.Service
}
// NewAPDeliveryWorker returns a new APDeliveryWorker that feeds from queue, using given HTTP client.
func NewAPDeliveryWorker(client *Client, queue *queue.StructQueue[*queue.APRequest]) APDeliveryWorker {
return APDeliveryWorker{
client: client,
queue: queue,
backlog: make([]*delivery, 0, 256),
}
}
// Start will attempt to start the DeliveryWorker{}.
func (w *APDeliveryWorker) Start() bool {
return w.service.GoRun(w.process)
}
// Stop will attempt to stop the DeliveryWorker{}.
func (w *APDeliveryWorker) Stop() bool {
return w.service.Stop()
}
// process is the main delivery worker processing routine.
func (w *APDeliveryWorker) process(ctx context.Context) {
if w.client == nil || w.queue == nil {
panic("nil delivery worker fields")
}
loop:
for {
// Get next delivery.
dlv, ok := w.next(ctx)
if !ok {
return
}
// Check whether backoff required.
if d := dlv.BackOff(); d != 0 {
// Start backoff sleep timer.
backoff, cncl := sleepch(d)
select {
case <-ctx.Done():
// Main ctx
// cancelled.
cncl()
case <-w.queue.Wait():
// A new message was
// queued, re-add this
// to backlog + retry.
w.pushBacklog(dlv)
cncl()
continue loop
case <-backoff:
// successful
// backoff!
}
}
dlv.log.Info("performing request")
// Attempt outoing delivery of request.
_, retry, err := w.client.do(&dlv.request)
if err == nil {
continue loop
}
dlv.log.Error(err)
if !retry || w.client.badHosts.Has(dlv.host) ||
dlv.attempts > w.client.retries {
// Drop deliveries when no retry requested,
// or we reach max defined retry attempts.
// "bad" hosts support a max of 1 attempt.
w.client.badHosts.Set(dlv.host, struct{}{})
continue loop
}
// Determine next delivery attempt.
dlv.next = time.Now().Add(dlv.BackOff())
// Push to backlog.
w.pushBacklog(dlv)
}
}
// next gets the next available delivery, blocking until available if necessary.
func (w *APDeliveryWorker) next(ctx context.Context) (*delivery, bool) {
loop:
for {
// Try pop next queued.
msg, ok := w.queue.Pop()
if !ok {
// Check the backlog.
if len(w.backlog) > 0 {
// Sort by 'next' time.
sortDeliveries(w.backlog)
// Pop next delivery.
dlv := w.popBacklog()
return dlv, true
}
select {
// Backlog is empty, we MUST
// block until next enqueued.
case <-w.queue.Wait():
continue loop
// Worker was stopped.
case <-ctx.Done():
return nil, false
}
}
// Wrap msg in delivery type.
return wrapMsg(ctx, msg), true
}
}
// popBacklog pops next available from the backlog.
func (w *APDeliveryWorker) popBacklog() *delivery {
if len(w.backlog) == 0 {
return nil
}
// Pop from backlog.
dlv := w.backlog[0]
// Shift backlog down by one.
copy(w.backlog, w.backlog[1:])
w.backlog = w.backlog[:len(w.backlog)-1]
return dlv
}
// pushBacklog pushes the given delivery to backlog.
func (w *APDeliveryWorker) pushBacklog(dlv *delivery) {
w.backlog = append(w.backlog, dlv)
}
// delivery wraps request{}
// to cache logging fields.
type delivery struct {
// cached log
// entry fields.
log log.Entry
// next attempt time.
next time.Time
// hostname string
// for bad host check.
host string
// embedded
// request.
request
}
// BackOff returns backoff duration to sleep for, calculated
// from the .next attempt field subtracted from current time.
func (d *delivery) BackOff() time.Duration {
if d.next.IsZero() {
return 0
}
return time.Now().Sub(d.next)
}
// wrapMsg wraps a received queued AP request message in our delivery type.
func wrapMsg(ctx context.Context, msg *queue.APRequest) *delivery {
dlv := new(delivery)
dlv.request = wrapRequest(msg.Request)
dlv.log = requestLog(dlv.req)
dlv.host = dlv.req.URL.Hostname()
ctx = gtscontext.WithValues(ctx, msg.Request.Context())
dlv.req = dlv.req.WithContext(ctx)
return dlv
}
// sortDeliveries sorts deliveries according
// to when is the first requiring re-attempt.
func sortDeliveries(d []*delivery) {
slices.SortFunc(d, func(a, b *delivery) int {
const k = +1
switch {
case a.next.Before(b.next):
return +k
case b.next.Before(a.next):
return -k
default:
return 0
}
})
}

View file

@ -29,37 +29,42 @@ const (
baseBackoff = 2 * time.Second
)
// request wraps an HTTP request
// Request wraps an HTTP request
// to add our own retry / backoff.
type request struct {
type Request struct {
// log entry fields.
log log.Entry
// underlying request.
req *http.Request
// current backoff dur.
// Current backoff dur.
backoff time.Duration
// delivery attempts.
// Delivery attempts.
attempts uint
// done is marked when
// no more requests may
// be attempted.
done bool
// underlying request.
*http.Request
}
// wrapRequest wraps an http.Request{} in our own request{} type.
func wrapRequest(req *http.Request) request {
var r request
r.req = req
return r
}
// requestLog returns a prepared log entry with fields for http.Request{}.
func requestLog(r *http.Request) log.Entry {
return log.WithContext(r.Context()).
// WrapRequest wraps an existing http.Request within
// our own httpclient.Request with retry / backoff tracking.
func WrapRequest(r *http.Request) Request {
var rr Request
rr.Request = r
rr.log = log.WithContext(r.Context()).
WithField("method", r.Method).
WithField("url", r.URL.String())
WithField("url", r.URL.String()).
WithField("contentType", r.Header.Get("Content-Type"))
return rr
}
// BackOff returns the currently set backoff duration,
// setting a default according to no. attempts if needed.
func (r *request) BackOff() time.Duration {
// GetBackOff returns the currently set backoff duration,
// (using a default according to no. attempts if needed).
func (r *Request) BackOff() time.Duration {
if r.backoff <= 0 {
// No backoff dur found, set our predefined
// backoff according to a multiplier of 2^n.

View file

@ -77,9 +77,9 @@ func (f *federate) DeleteAccount(ctx context.Context, account *gtsmodel.Account)
// Drop any queued outgoing AP requests to / from account,
// (this stops any queued likes, boosts, creates etc).
f.state.Queues.APRequests.Delete("ActorID", account.URI)
f.state.Queues.APRequests.Delete("ObjectID", account.URI)
f.state.Queues.APRequests.Delete("TargetID", account.URI)
f.state.Workers.Delivery.Queue.Delete("ActorID", account.URI)
f.state.Workers.Delivery.Queue.Delete("ObjectID", account.URI)
f.state.Workers.Delivery.Queue.Delete("TargetID", account.URI)
// Parse relevant URI(s).
outboxIRI, err := parseURI(account.OutboxURI)
@ -230,8 +230,8 @@ func (f *federate) DeleteStatus(ctx context.Context, status *gtsmodel.Status) er
// Drop any queued outgoing http requests for status,
// (this stops any queued likes, boosts, creates etc).
f.state.Queues.APRequests.Delete("ObjectID", status.URI)
f.state.Queues.APRequests.Delete("TargetID", status.URI)
f.state.Workers.Delivery.Queue.Delete("ObjectID", status.URI)
f.state.Workers.Delivery.Queue.Delete("TargetID", status.URI)
// Ensure the status model is fully populated.
if err := f.state.DB.PopulateStatus(ctx, status); err != nil {

View file

@ -1,71 +0,0 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package queue
import (
"net/http"
)
// TODO: add indexable queues for
// fedi / client api workers
// type ClientAPIMsg struct {
// // ...
// APObjectType string
// // ...
// APActivityType string
// // ...
// GTSID string
// // ...
// GTSModel any
// // ...
// Origin *gtsmodel.Account
// // ...
// Target *gtsmodel.Account
// }
//
// type FediAPIMsg struct {
// // ...
// APObjectType string
// // ...
// APActivityType string
// // ...
// APObjectID *url.URL
// // ...
// APObjectModel any
// // ...
// GTSModel any
// // ...
// Requesting *gtsmodel.Account
// // ...
// Receiving *gtsmodel.Account
// }
type APRequest struct {
// ActorID ...
ActorID string
// ObjectID ...
ObjectID string
// TargetID ...
TargetID string
// Request ...
Request *http.Request
}

View file

@ -1,47 +0,0 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package queue
import (
"codeberg.org/gruf/go-structr"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
// Queues ...
type Queues struct {
// APRequests ...
APRequests StructQueue[*APRequest]
}
// Init will re(initialize) queues. NOTE: the queue
// MUST NOT be in use anywhere, this is not thread-safe.
func (q *Queues) Init() {
log.Infof(nil, "init: %p", q)
q.initHTTPRequest()
}
func (q *Queues) initHTTPRequest() {
q.APRequests.Init(structr.QueueConfig[*APRequest]{
Indices: []structr.IndexConfig{
{Fields: "ActorID", Multiple: true},
{Fields: "ObjectID", Multiple: true},
{Fields: "TargetID", Multiple: true},
},
})
}

View file

@ -21,7 +21,6 @@ import (
"codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/gotosocial/internal/cache"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/queue"
"github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/workers"
@ -37,9 +36,6 @@ type State struct {
// Caches provides access to this state's collection of caches.
Caches cache.Caches
// Queues provides access to this state's collection of queues.
Queues queue.Queues
// Timelines provides access to this state's collection of timelines.
Timelines timeline.Timelines

View file

@ -28,7 +28,6 @@ import (
"io"
"net/http"
"net/url"
"runtime"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-cache/v3"
@ -56,24 +55,16 @@ type controller struct {
client pub.HttpClient
trspCache cache.TTLCache[string, *transport]
userAgent string
senders int // no. concurrent batch delivery routines.
}
// NewController returns an implementation of the Controller interface for creating new transports
func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client pub.HttpClient) Controller {
var (
host = config.GetHost()
proto = config.GetProtocol()
version = config.GetSoftwareVersion()
senderMultiplier = config.GetAdvancedSenderMultiplier()
host = config.GetHost()
proto = config.GetProtocol()
version = config.GetSoftwareVersion()
)
senders := senderMultiplier * runtime.GOMAXPROCS(0)
if senders < 1 {
// Clamp senders to 1.
senders = 1
}
c := &controller{
state: state,
fedDB: federatingDB,
@ -81,7 +72,6 @@ func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.C
client: client,
trspCache: cache.NewTTL[string, *transport](0, 100, 0),
userAgent: fmt.Sprintf("gotosocial/%s (+%s://%s)", version, proto, host),
senders: senders,
}
return c

View file

@ -28,13 +28,14 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/queue"
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
)
func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error {
var (
// accumulated prepared reqs.
reqs []*queue.APRequest
// accumulated delivery reqs.
reqs []*delivery.Delivery
// accumulated preparation errs.
errs gtserror.MultiError
@ -78,8 +79,7 @@ func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}
reqs = append(reqs, req)
}
// Push the request list to HTTP client worker queue.
t.controller.state.Queues.APRequests.Push(reqs...)
t.controller.state.Workers.Delivery.Queue.Push(reqs...)
// Return combined err.
return errs.Combine()
@ -109,8 +109,7 @@ func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to
return err
}
// Push the request to HTTP client worker queue.
t.controller.state.Queues.APRequests.Push(req)
t.controller.state.Workers.Delivery.Queue.Push(req)
return nil
}
@ -126,7 +125,7 @@ func (t *transport) prepare(
data []byte,
to *url.URL,
) (
*queue.APRequest,
*delivery.Delivery,
error,
) {
url := to.String()
@ -142,19 +141,21 @@ func (t *transport) prepare(
ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID)
ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign)
req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
// Prepare a new request with data body directed at URL.
r, err := http.NewRequestWithContext(ctx, "POST", url, &body)
if err != nil {
return nil, gtserror.Newf("error preparing request: %w", err)
}
req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
req.Header.Add("Accept-Charset", "utf-8")
// Set the standard ActivityPub content-type + charset headers.
r.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
r.Header.Add("Accept-Charset", "utf-8")
return &queue.APRequest{
return &delivery.Delivery{
ActorID: actorID,
ObjectID: objectID,
TargetID: targetID,
Request: req,
Request: httpclient.WrapRequest(r),
}, nil
}

View file

@ -0,0 +1,281 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package delivery
import (
"context"
"slices"
"time"
"codeberg.org/gruf/go-runners"
"codeberg.org/gruf/go-structr"
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/queue"
)
type Delivery struct {
// ActorID ...
ActorID string
// ObjectID ...
ObjectID string
// TargetID ...
TargetID string
// Request ...
Request httpclient.Request
// internal fields.
next time.Time
}
// BackOff ...
func (dlv *Delivery) BackOff() time.Duration {
if dlv.next.IsZero() {
return 0
}
return time.Until(dlv.next)
}
// WorkerPool wraps multiple Worker{}s in
// a singular struct for easy multi start/stop.
type WorkerPool struct {
// Client defines httpclient.Client{}
// passed to each of delivery pool Worker{}s.
Client *httpclient.Client
// Queue is the embedded queue.StructQueue{}
// passed to each of delivery pool Worker{}s.
Queue queue.StructQueue[*Delivery]
// internal fields.
workers []Worker
}
// Init will initialize the Worker{} pool
// with given http client, request queue to pull
// from and number of delivery workers to spawn.
func (p *WorkerPool) Init(client *httpclient.Client) {
p.Client = client
p.Queue.Init(structr.QueueConfig[*Delivery]{
Indices: []structr.IndexConfig{
{Fields: "ActorID", Multiple: true},
{Fields: "ObjectID", Multiple: true},
{Fields: "TargetID", Multiple: true},
},
})
}
// Start will attempt to start 'n' Worker{}s.
func (p *WorkerPool) Start(n int) (ok bool) {
if ok = (len(p.workers) == 0); ok {
p.workers = make([]Worker, n)
for i := range p.workers {
p.workers[i].Client = p.Client
p.workers[i].Queue = &p.Queue
ok = p.workers[i].Start() && ok
}
}
return
}
// Stop will attempt to stop contained Worker{}s.
func (p *WorkerPool) Stop() (ok bool) {
if ok = (len(p.workers) > 0); ok {
for i := range p.workers {
ok = p.workers[i].Stop() && ok
}
p.workers = p.workers[:0]
}
return
}
// Worker wraps an httpclient.Client{} to feed
// from queue.StructQueue{} for ActivityPub reqs
// to deliver. It does so while prioritizing new
// queued requests over backlogged retries.
type Worker struct {
// Client is the httpclient.Client{} that
// delivery worker will use for requests.
Client *httpclient.Client
// Queue is the Delivery{} message queue
// that delivery worker will feed from.
Queue *queue.StructQueue[*Delivery]
// internal fields.
backlog []*Delivery
service runners.Service
}
// Start will attempt to start the Worker{}.
func (w *Worker) Start() bool {
return w.service.GoRun(w.process)
}
// Stop will attempt to stop the Worker{}.
func (w *Worker) Stop() bool {
return w.service.Stop()
}
// process is the main delivery worker processing routine.
func (w *Worker) process(ctx context.Context) {
if w.Client == nil || w.Queue == nil {
panic("not yet initialized")
}
loop:
for {
// Get next delivery.
dlv, ok := w.next(ctx)
if !ok {
return
}
// Check whether backoff required.
if d := dlv.BackOff(); d != 0 {
// Get queue wait ch.
queue := w.Queue.Wait()
// Start backoff sleep timer.
backoff := time.NewTimer(d)
select {
case <-ctx.Done():
// Main ctx
// cancelled.
backoff.Stop()
case <-queue:
// A new message was
// queued, re-add this
// to backlog + retry.
w.pushBacklog(dlv)
backoff.Stop()
continue loop
case <-backoff.C:
// success!
}
}
// Attempt delivery of AP request.
rsp, retry, err := w.Client.DoOnce(
&dlv.Request,
)
if err == nil {
// Ensure body closed.
_ = rsp.Body.Close()
continue loop
}
if !retry {
// Drop deliveries when no
// retry requested, or they
// reached max (either).
continue loop
}
// Determine next delivery attempt.
dlv.next = time.Now().Add(dlv.BackOff())
// Push to backlog.
w.pushBacklog(dlv)
}
}
// next gets the next available delivery, blocking until available if necessary.
func (w *Worker) next(ctx context.Context) (*Delivery, bool) {
loop:
for {
// Try pop next queued.
dlv, ok := w.Queue.Pop()
if !ok {
// Check the backlog.
if len(w.backlog) > 0 {
// Sort by 'next' time.
sortDeliveries(w.backlog)
// Pop next delivery.
dlv := w.popBacklog()
return dlv, true
}
select {
// Backlog is empty, we MUST
// block until next enqueued.
case <-w.Queue.Wait():
continue loop
// Worker was stopped.
case <-ctx.Done():
return nil, false
}
}
// Replace request context for worker state canceling.
dlv.Request.Request = dlv.Request.WithContext(ctx)
return dlv, true
}
}
// popBacklog pops next available from the backlog.
func (w *Worker) popBacklog() *Delivery {
if len(w.backlog) == 0 {
return nil
}
// Pop from backlog.
dlv := w.backlog[0]
// Shift backlog down by one.
copy(w.backlog, w.backlog[1:])
w.backlog = w.backlog[:len(w.backlog)-1]
return dlv
}
// pushBacklog pushes the given delivery to backlog.
func (w *Worker) pushBacklog(dlv *Delivery) {
w.backlog = append(w.backlog, dlv)
}
// sortDeliveries sorts deliveries according
// to when is the first requiring re-attempt.
func sortDeliveries(d []*Delivery) {
slices.SortFunc(d, func(a, b *Delivery) int {
const k = +1
switch {
case a.next.Before(b.next):
return +k
case b.next.Before(a.next):
return -k
default:
return 0
}
})
}

View file

@ -24,17 +24,17 @@ import (
"codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
)
type Workers struct {
// Main task scheduler instance.
Scheduler scheduler.Scheduler
// APDelivery ...
APDelivery httpclient.APDeliveryWorkerPool
// ...
Delivery delivery.WorkerPool
// ClientAPI provides a worker pool that handles both
// incoming client actions, and our own side-effects.
@ -70,7 +70,8 @@ type Workers struct {
_ nocopy
}
// Start will start all of the contained worker pools (and global scheduler).
// Start will start all of the contained
// worker pools (and global scheduler).
func (w *Workers) Start() {
// Get currently set GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0)
@ -79,7 +80,7 @@ func (w *Workers) Start() {
tryUntil("start ap delivery workerpool", 5, func() bool {
n := config.GetAdvancedSenderMultiplier()
return w.APDelivery.Start(n * maxprocs)
return w.Delivery.Start(n * maxprocs)
})
tryUntil("starting client API workerpool", 5, func() bool {
@ -98,7 +99,7 @@ func (w *Workers) Start() {
// Stop will stop all of the contained worker pools (and global scheduler).
func (w *Workers) Stop() {
tryUntil("stopping scheduler", 5, w.Scheduler.Stop)
tryUntil("stopping ap delivery workerpool", 5, w.APDelivery.Stop)
tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop)
tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop)
tryUntil("stopping federator workerpool", 5, w.Federator.Stop)
tryUntil("stopping media workerpool", 5, w.Media.Stop)