mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-10-31 22:18:52 +00:00
add delivery worker type that pulls from queue to httpclient package
This commit is contained in:
parent
85bc140b58
commit
aa01437a5b
11 changed files with 700 additions and 117 deletions
6
internal/cache/cache.go
vendored
6
internal/cache/cache.go
vendored
|
@ -20,11 +20,17 @@ package cache
|
|||
import (
|
||||
"time"
|
||||
|
||||
"codeberg.org/gruf/go-cache/v3/ttl"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/cache/headerfilter"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
)
|
||||
|
||||
type Caches struct {
|
||||
|
||||
// BadHosts provides access to the HTTP
|
||||
// client bad (i.e. erroring) hosts cache.
|
||||
BadHosts ttl.Cache[string, struct{}]
|
||||
|
||||
// GTS provides access to the collection of
|
||||
// gtsmodel object caches. (used by the database).
|
||||
GTS GTSCaches
|
||||
|
|
|
@ -32,13 +32,12 @@ import (
|
|||
"time"
|
||||
|
||||
"codeberg.org/gruf/go-bytesize"
|
||||
"codeberg.org/gruf/go-cache/v3"
|
||||
errorsv2 "codeberg.org/gruf/go-errors/v2"
|
||||
"codeberg.org/gruf/go-iotools"
|
||||
"codeberg.org/gruf/go-kv"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/state"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -106,9 +105,9 @@ type Config struct {
|
|||
// - optional request signing
|
||||
// - request logging
|
||||
type Client struct {
|
||||
client http.Client
|
||||
badHosts cache.TTLCache[string, struct{}]
|
||||
bodyMax int64
|
||||
state *state.State
|
||||
client http.Client
|
||||
bodyMax int64
|
||||
}
|
||||
|
||||
// New returns a new instance of Client initialized using configuration.
|
||||
|
@ -176,32 +175,11 @@ func New(cfg Config) *Client {
|
|||
DisableCompression: cfg.DisableCompression,
|
||||
}}
|
||||
|
||||
// Initiate outgoing bad hosts lookup cache.
|
||||
c.badHosts = cache.NewTTL[string, struct{}](0, 1000, 0)
|
||||
c.badHosts.SetTTL(time.Hour, false)
|
||||
if !c.badHosts.Start(time.Minute) {
|
||||
log.Panic(nil, "failed to start transport controller cache")
|
||||
}
|
||||
|
||||
return &c
|
||||
}
|
||||
|
||||
// Do will essentially perform http.Client{}.Do() with retry-backoff functionality.
|
||||
func (c *Client) Do(r *http.Request) (*http.Response, error) {
|
||||
return c.DoSigned(r, func(r *http.Request) error {
|
||||
return nil // no request signing
|
||||
})
|
||||
}
|
||||
|
||||
// DoSigned will essentially perform http.Client{}.Do() with retry-backoff functionality and requesting signing..
|
||||
func (c *Client) DoSigned(r *http.Request, sign SignFunc) (rsp *http.Response, err error) {
|
||||
const (
|
||||
// max no. attempts.
|
||||
maxRetries = 5
|
||||
|
||||
// starting backoff duration.
|
||||
baseBackoff = 2 * time.Second
|
||||
)
|
||||
func (c *Client) Do(r *http.Request) (rsp *http.Response, err error) {
|
||||
|
||||
// First validate incoming request.
|
||||
if err := ValidateRequest(r); err != nil {
|
||||
|
@ -219,108 +197,50 @@ func (c *Client) DoSigned(r *http.Request, sign SignFunc) (rsp *http.Response, e
|
|||
// errors that are retried upon are server failure, TLS
|
||||
// and domain resolution type errors, so this cached result
|
||||
// indicates this server is likely having issues.
|
||||
fastFail = c.badHosts.Has(host)
|
||||
fastFail = c.state.Caches.BadHosts.Has(host)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// On error return mark as bad-host.
|
||||
c.badHosts.Set(host, struct{}{})
|
||||
// On error return ensure marked as bad-host.
|
||||
c.state.Caches.BadHosts.Set(host, struct{}{})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start a log entry for this request
|
||||
l := log.WithContext(r.Context()).
|
||||
WithFields(kv.Fields{
|
||||
{"method", r.Method},
|
||||
{"url", r.URL.String()},
|
||||
}...)
|
||||
// Prepare log entry.
|
||||
log := requestLog(r)
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
var backoff time.Duration
|
||||
// Wrap in our own request
|
||||
// type for retry-backoff.
|
||||
req := wrapRequest(r)
|
||||
|
||||
l.Info("performing request")
|
||||
for req.attempts < maxRetries {
|
||||
var retry bool
|
||||
|
||||
// Perform the request.
|
||||
rsp, err = c.do(r)
|
||||
if err == nil { //nolint:gocritic
|
||||
log.Info("performing request")
|
||||
|
||||
// TooManyRequest means we need to slow
|
||||
// down and retry our request. Codes over
|
||||
// 500 generally indicate temp. outages.
|
||||
if code := rsp.StatusCode; code < 500 &&
|
||||
code != http.StatusTooManyRequests {
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
// Create loggable error from response status code.
|
||||
err = fmt.Errorf(`http response: %s`, rsp.Status)
|
||||
|
||||
// Search for a provided "Retry-After" header value.
|
||||
if after := rsp.Header.Get("Retry-After"); after != "" {
|
||||
|
||||
// Get current time.
|
||||
now := time.Now()
|
||||
|
||||
if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
|
||||
// An integer number of backoff seconds was provided.
|
||||
backoff = time.Duration(u) * time.Second
|
||||
} else if at, _ := http.ParseTime(after); !at.Before(now) {
|
||||
// An HTTP formatted future date-time was provided.
|
||||
backoff = at.Sub(now)
|
||||
}
|
||||
|
||||
// Don't let their provided backoff exceed our max.
|
||||
if max := baseBackoff * maxRetries; backoff > max {
|
||||
backoff = max
|
||||
}
|
||||
}
|
||||
|
||||
// Close + unset rsp.
|
||||
_ = rsp.Body.Close()
|
||||
rsp = nil
|
||||
|
||||
} else if errorsv2.IsV2(err,
|
||||
context.DeadlineExceeded,
|
||||
context.Canceled,
|
||||
ErrBodyTooLarge,
|
||||
ErrReservedAddr,
|
||||
) {
|
||||
// Non-retryable errors.
|
||||
return nil, err
|
||||
} else if errstr := err.Error(); // nocollapse
|
||||
strings.Contains(errstr, "stopped after 10 redirects") ||
|
||||
strings.Contains(errstr, "tls: ") ||
|
||||
strings.Contains(errstr, "x509: ") {
|
||||
// These error types aren't wrapped
|
||||
// so we have to check the error string.
|
||||
// All are unrecoverable!
|
||||
return nil, err
|
||||
} else if dnserr := (*net.DNSError)(nil); // nocollapse
|
||||
errors.As(err, &dnserr) && dnserr.IsNotFound {
|
||||
// DNS lookup failure, this domain does not exist
|
||||
return nil, gtserror.SetNotFound(err)
|
||||
// Perform the http request.
|
||||
rsp, retry, err = c.do(&req)
|
||||
if err == nil || !retry {
|
||||
return
|
||||
}
|
||||
|
||||
log.Error(err)
|
||||
|
||||
if fastFail {
|
||||
// on fast-fail, don't bother backoff/retry
|
||||
return nil, fmt.Errorf("%w (fast fail)", err)
|
||||
}
|
||||
|
||||
if backoff == 0 {
|
||||
// No retry-after found, set our predefined
|
||||
// backoff according to a multiplier of 2^n.
|
||||
backoff = baseBackoff * 1 << (i + 1)
|
||||
}
|
||||
|
||||
l.Errorf("backing off for %s after http request error: %v", backoff, err)
|
||||
// Start the backoff timer channel.
|
||||
backoff, cncl := sleepch(req.BackOff())
|
||||
|
||||
select {
|
||||
// Request ctx cancelled
|
||||
case <-r.Context().Done():
|
||||
return nil, r.Context().Err()
|
||||
cncl()
|
||||
|
||||
// Backoff for some time
|
||||
case <-time.After(backoff):
|
||||
// Backoff for a time
|
||||
case <-backoff:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -329,12 +249,80 @@ func (c *Client) DoSigned(r *http.Request, sign SignFunc) (rsp *http.Response, e
|
|||
return
|
||||
}
|
||||
|
||||
// do wraps http.Client{}.Do() to provide safely limited response bodies.
|
||||
func (c *Client) do(req *http.Request) (*http.Response, error) {
|
||||
// do wraps an underlying http.Client{}.Do() to perform our wrapped request type:
|
||||
// rewinding response body to permit reuse, signing request data when SignFunc provided,
|
||||
// safely limiting response body, updating retry attempt counts and setting retry-after.
|
||||
func (c *Client) do(r *request) (*http.Response, bool /* retry */, error) {
|
||||
// Update the
|
||||
// attempts.
|
||||
r.attempts++
|
||||
|
||||
// Reset backoff.
|
||||
r.backoff = 0
|
||||
|
||||
// Perform the HTTP request.
|
||||
rsp, err := c.client.Do(req)
|
||||
rsp, err := c.client.Do(r.req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
if errorsv2.IsV2(err,
|
||||
context.DeadlineExceeded,
|
||||
context.Canceled,
|
||||
ErrBodyTooLarge,
|
||||
ErrReservedAddr,
|
||||
) {
|
||||
// Non-retryable errors.
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if errstr := err.Error(); // nocollapse
|
||||
strings.Contains(errstr, "stopped after 10 redirects") ||
|
||||
strings.Contains(errstr, "tls: ") ||
|
||||
strings.Contains(errstr, "x509: ") {
|
||||
// These error types aren't wrapped
|
||||
// so we have to check the error string.
|
||||
// All are unrecoverable!
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if dnserr := (*net.DNSError)(nil); // nocollapse
|
||||
errors.As(err, &dnserr) && dnserr.IsNotFound {
|
||||
// DNS lookup failure, this domain does not exist
|
||||
return nil, false, gtserror.SetNotFound(err)
|
||||
}
|
||||
|
||||
return nil, true, err
|
||||
|
||||
} else if rsp.StatusCode > 500 ||
|
||||
rsp.StatusCode == http.StatusTooManyRequests {
|
||||
|
||||
// Codes over 500 (and 429: too many requests)
|
||||
// are generally temporary errors. For these
|
||||
// we replace the response with a loggable error.
|
||||
err = fmt.Errorf(`http response: %s`, rsp.Status)
|
||||
|
||||
// Search for a provided "Retry-After" header value.
|
||||
if after := rsp.Header.Get("Retry-After"); after != "" {
|
||||
|
||||
// Get cur time.
|
||||
now := time.Now()
|
||||
|
||||
if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
|
||||
// An integer no. of backoff seconds was provided.
|
||||
r.backoff = time.Duration(u) * time.Second
|
||||
} else if at, _ := http.ParseTime(after); !at.Before(now) {
|
||||
// An HTTP formatted future date-time was provided.
|
||||
r.backoff = at.Sub(time.Now())
|
||||
}
|
||||
|
||||
// Don't let their provided backoff exceed our max.
|
||||
if max := baseBackoff * maxRetries; r.backoff > max {
|
||||
r.backoff = max
|
||||
}
|
||||
}
|
||||
|
||||
// Unset + close rsp.
|
||||
_ = rsp.Body.Close()
|
||||
return nil, true, err
|
||||
}
|
||||
|
||||
// Seperate the body implementers.
|
||||
|
@ -364,11 +352,10 @@ func (c *Client) do(req *http.Request) (*http.Response, error) {
|
|||
|
||||
// Check response body not too large.
|
||||
if rsp.ContentLength > c.bodyMax {
|
||||
_ = rsp.Body.Close()
|
||||
return nil, ErrBodyTooLarge
|
||||
return nil, false, ErrBodyTooLarge
|
||||
}
|
||||
|
||||
return rsp, nil
|
||||
return rsp, true, nil
|
||||
}
|
||||
|
||||
// cast discard writer to full interface it supports.
|
||||
|
|
262
internal/httpclient/delivery.go
Normal file
262
internal/httpclient/delivery.go
Normal file
|
@ -0,0 +1,262 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package httpclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"codeberg.org/gruf/go-runners"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/queue"
|
||||
)
|
||||
|
||||
type DeliveryWorkerPool struct {
|
||||
client *Client
|
||||
queue *queue.StructQueue[queue.HTTPRequest]
|
||||
workers []DeliveryWorker
|
||||
}
|
||||
|
||||
// Init ...
|
||||
func (p *DeliveryWorkerPool) Init(
|
||||
client *Client,
|
||||
queue *queue.StructQueue[queue.HTTPRequest],
|
||||
workers int,
|
||||
) {
|
||||
p.client = client
|
||||
p.queue = queue
|
||||
p.workers = make([]DeliveryWorker, workers)
|
||||
for i := range p.workers {
|
||||
p.workers[i] = NewDeliveryWorker(
|
||||
p.client,
|
||||
p.queue,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Start ...
|
||||
func (p *DeliveryWorkerPool) Start() bool {
|
||||
if len(p.workers) == 0 {
|
||||
return false
|
||||
}
|
||||
ok := true
|
||||
for i := range p.workers {
|
||||
ok = p.workers[i].Start() && ok
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// Stop ...
|
||||
func (p *DeliveryWorkerPool) Stop() bool {
|
||||
if len(p.workers) == 0 {
|
||||
return false
|
||||
}
|
||||
ok := true
|
||||
for i := range p.workers {
|
||||
ok = p.workers[i].Stop() && ok
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
type DeliveryWorker struct {
|
||||
client *Client
|
||||
queue *queue.StructQueue[queue.HTTPRequest]
|
||||
backlog []*delivery
|
||||
service runners.Service
|
||||
}
|
||||
|
||||
// NewDeliveryWorker returns a new DeliveryWorker that feeds from queue, using given HTTP client.
|
||||
func NewDeliveryWorker(client *Client, queue *queue.StructQueue[queue.HTTPRequest]) DeliveryWorker {
|
||||
return DeliveryWorker{
|
||||
client: client,
|
||||
queue: queue,
|
||||
backlog: make([]*delivery, 0, 256),
|
||||
}
|
||||
}
|
||||
|
||||
// Start ...
|
||||
func (w *DeliveryWorker) Start() bool {
|
||||
return w.service.Run(w.process)
|
||||
}
|
||||
|
||||
// Stop ...
|
||||
func (w *DeliveryWorker) Stop() bool {
|
||||
return w.service.Stop()
|
||||
}
|
||||
|
||||
// process is the main delivery worker processing routine.
|
||||
func (w *DeliveryWorker) process(ctx context.Context) {
|
||||
if w.client == nil || w.queue == nil {
|
||||
panic("nil delivery worker fields")
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
// Get next delivery.
|
||||
dlv, ok := w.next(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Check whether backoff required.
|
||||
if d := dlv.BackOff(); d != 0 {
|
||||
|
||||
// Start backoff sleep timer.
|
||||
backoff, cncl := sleepch(d)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Main ctx
|
||||
// cancelled.
|
||||
cncl()
|
||||
|
||||
case <-w.queue.Wait():
|
||||
// A new message was
|
||||
// queued, re-add this
|
||||
// to backlog + retry.
|
||||
w.pushBacklog(dlv)
|
||||
cncl()
|
||||
continue loop
|
||||
|
||||
case <-backoff:
|
||||
// successful
|
||||
// backoff!
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt outoing delivery of request.
|
||||
_, retry, err := w.client.do(&dlv.request)
|
||||
if err == nil || !retry {
|
||||
continue loop
|
||||
}
|
||||
|
||||
if dlv.attempts > maxRetries {
|
||||
// Drop deliveries once
|
||||
// we reach max retries.
|
||||
continue loop
|
||||
}
|
||||
|
||||
// Determine next delivery attempt.
|
||||
dlv.next = time.Now().Add(dlv.BackOff())
|
||||
|
||||
// Push to backlog.
|
||||
w.pushBacklog(dlv)
|
||||
}
|
||||
}
|
||||
|
||||
// next gets the next available delivery, blocking until available if necessary.
|
||||
func (w *DeliveryWorker) next(ctx context.Context) (*delivery, bool) {
|
||||
// Try pop next queued.
|
||||
msg, ok := w.queue.Pop()
|
||||
|
||||
if !ok {
|
||||
// Check the backlog.
|
||||
if len(w.backlog) > 0 {
|
||||
|
||||
// Sort by 'next' time.
|
||||
sortDeliveries(w.backlog)
|
||||
|
||||
// Pop next delivery.
|
||||
dlv := w.popBacklog()
|
||||
|
||||
return dlv, true
|
||||
}
|
||||
|
||||
// Backlog is empty, we MUST
|
||||
// block until next enqueued.
|
||||
msg, ok = w.queue.PopCtx(ctx)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap msg in delivery type.
|
||||
return wrapMsg(ctx, msg), true
|
||||
}
|
||||
|
||||
// popBacklog pops next available from the backlog.
|
||||
func (w *DeliveryWorker) popBacklog() *delivery {
|
||||
if len(w.backlog) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pop from backlog.
|
||||
dlv := w.backlog[0]
|
||||
|
||||
// Shift backlog down by one.
|
||||
copy(w.backlog, w.backlog[1:])
|
||||
w.backlog = w.backlog[:len(w.backlog)-1]
|
||||
|
||||
return dlv
|
||||
}
|
||||
|
||||
// pushBacklog pushes the given delivery to backlog.
|
||||
func (w *DeliveryWorker) pushBacklog(dlv *delivery) {
|
||||
w.backlog = append(w.backlog, dlv)
|
||||
}
|
||||
|
||||
// delivery wraps request{}
|
||||
// to cache logging fields.
|
||||
type delivery struct {
|
||||
|
||||
// cached log
|
||||
// entry fields.
|
||||
log log.Entry
|
||||
|
||||
// next attempt time.
|
||||
next time.Time
|
||||
|
||||
// embedded
|
||||
// request.
|
||||
request
|
||||
}
|
||||
|
||||
// BackOff returns backoff duration to sleep for, calculated
|
||||
// from the .next attempt field subtracted from current time.
|
||||
func (d *delivery) BackOff() time.Duration {
|
||||
if d.next.IsZero() {
|
||||
return 0
|
||||
}
|
||||
return time.Now().Sub(d.next)
|
||||
}
|
||||
|
||||
// wrapMsg wraps a received queued HTTP request message in our delivery type.
|
||||
func wrapMsg(ctx context.Context, msg queue.HTTPRequest) *delivery {
|
||||
dlv := new(delivery)
|
||||
dlv.request = wrapRequest(msg.Request)
|
||||
dlv.log = requestLog(dlv.req)
|
||||
dlv.req = dlv.req.WithContext(ctx)
|
||||
return dlv
|
||||
}
|
||||
|
||||
// sortDeliveries sorts deliveries according
|
||||
// to when is the first requiring re-attempt.
|
||||
func sortDeliveries(d []*delivery) {
|
||||
slices.SortFunc(d, func(a, b *delivery) int {
|
||||
const k = +1
|
||||
switch {
|
||||
case a.next.Before(b.next):
|
||||
return +k
|
||||
case b.next.Before(a.next):
|
||||
return -k
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
})
|
||||
}
|
72
internal/httpclient/request.go
Normal file
72
internal/httpclient/request.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package httpclient
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// max no. attempts.
|
||||
maxRetries = 5
|
||||
|
||||
// starting backoff duration.
|
||||
baseBackoff = 2 * time.Second
|
||||
)
|
||||
|
||||
// request wraps an HTTP request
|
||||
// to add our own retry / backoff.
|
||||
type request struct {
|
||||
|
||||
// underlying request.
|
||||
req *http.Request
|
||||
|
||||
// current backoff dur.
|
||||
backoff time.Duration
|
||||
|
||||
// delivery attempts.
|
||||
attempts int
|
||||
}
|
||||
|
||||
// wrapRequest wraps an http.Request{} in our own request{} type.
|
||||
func wrapRequest(req *http.Request) request {
|
||||
var r request
|
||||
r.req = req
|
||||
return r
|
||||
}
|
||||
|
||||
// requestLog returns a prepared log entry with fields for http.Request{}.
|
||||
func requestLog(r *http.Request) log.Entry {
|
||||
return log.WithContext(r.Context()).
|
||||
WithField("method", r.Method).
|
||||
WithField("url", r.URL.String())
|
||||
}
|
||||
|
||||
// BackOff returns the currently set backoff duration,
|
||||
// setting a default according to no. attempts if needed.
|
||||
func (r *request) BackOff() time.Duration {
|
||||
if r.backoff <= 0 {
|
||||
// No backoff dur found, set our predefined
|
||||
// backoff according to a multiplier of 2^n.
|
||||
r.backoff = baseBackoff * 1 << (r.attempts + 1)
|
||||
}
|
||||
return r.backoff
|
||||
}
|
9
internal/httpclient/util.go
Normal file
9
internal/httpclient/util.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package httpclient
|
||||
|
||||
import "time"
|
||||
|
||||
// sleepch returns a blocking sleep channel and cancel function.
|
||||
func sleepch(d time.Duration) (<-chan time.Time, func() bool) {
|
||||
t := time.NewTimer(d)
|
||||
return t.C, t.Stop
|
||||
}
|
|
@ -38,7 +38,7 @@ func ValidateRequest(r *http.Request) error {
|
|||
return fmt.Errorf("%w: empty url host", ErrInvalidRequest)
|
||||
case r.URL.Scheme != "http" && r.URL.Scheme != "https":
|
||||
return fmt.Errorf("%w: unsupported protocol %q", ErrInvalidRequest, r.URL.Scheme)
|
||||
case strings.IndexFunc(r.Method, func(r rune) bool { return !httpguts.IsTokenRune(r) }) != -1:
|
||||
case strings.IndexFunc(r.Method, isNotTokenRune) != -1:
|
||||
return fmt.Errorf("%w: invalid method %q", ErrInvalidRequest, r.Method)
|
||||
}
|
||||
|
||||
|
@ -60,3 +60,8 @@ func ValidateRequest(r *http.Request) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isNotTokenRune wraps IsTokenRune to inverse result.
|
||||
func isNotTokenRune(r rune) bool {
|
||||
return !httpguts.IsTokenRune(r)
|
||||
}
|
||||
|
|
68
internal/queue/messages.go
Normal file
68
internal/queue/messages.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// TODO: add indexable queues for
|
||||
// fedi / client api workers
|
||||
// type ClientAPIMsg struct {
|
||||
// // ...
|
||||
// APObjectType string
|
||||
// // ...
|
||||
// APActivityType string
|
||||
// // ...
|
||||
// GTSID string
|
||||
// // ...
|
||||
// GTSModel any
|
||||
// // ...
|
||||
// Origin *gtsmodel.Account
|
||||
// // ...
|
||||
// Target *gtsmodel.Account
|
||||
// }
|
||||
//
|
||||
// type FediAPIMsg struct {
|
||||
// // ...
|
||||
// APObjectType string
|
||||
// // ...
|
||||
// APActivityType string
|
||||
// // ...
|
||||
// APObjectID *url.URL
|
||||
// // ...
|
||||
// APObjectModel any
|
||||
// // ...
|
||||
// GTSModel any
|
||||
// // ...
|
||||
// Requesting *gtsmodel.Account
|
||||
// // ...
|
||||
// Receiving *gtsmodel.Account
|
||||
// }
|
||||
|
||||
type HTTPRequest struct {
|
||||
|
||||
// ObjectID ...
|
||||
ObjectID string
|
||||
|
||||
// Request ...
|
||||
Request *http.Request
|
||||
|
||||
// Signer ...
|
||||
Signer func(*http.Request) error
|
||||
}
|
46
internal/queue/queues.go
Normal file
46
internal/queue/queues.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"codeberg.org/gruf/go-structr"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
)
|
||||
|
||||
// Queues ...
|
||||
type Queues struct {
|
||||
// HTTPRequest ...
|
||||
HTTPRequest StructQueue[*HTTPRequest]
|
||||
}
|
||||
|
||||
// Init will re(initialize) queues. NOTE: the queue
|
||||
// MUST NOT be in use anywhere, this is not thread-safe.
|
||||
func (q *Queues) Init() {
|
||||
log.Infof(nil, "init: %p", q)
|
||||
|
||||
q.initHTTPRequest()
|
||||
}
|
||||
|
||||
func (q *Queues) initHTTPRequest() {
|
||||
q.HTTPRequest.Init(structr.QueueConfig[*HTTPRequest]{
|
||||
Indices: []structr.IndexConfig{
|
||||
{Fields: "ObjectID", Multiple: true},
|
||||
{Fields: "Request.URL.Host", Multiple: true},
|
||||
},
|
||||
})
|
||||
}
|
115
internal/queue/wrappers.go
Normal file
115
internal/queue/wrappers.go
Normal file
|
@ -0,0 +1,115 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"codeberg.org/gruf/go-structr"
|
||||
)
|
||||
|
||||
// StructQueue ...
|
||||
type StructQueue[StructType any] struct {
|
||||
queue structr.Queue[StructType]
|
||||
index map[string]*structr.Index
|
||||
wait atomic.Pointer[chan struct{}]
|
||||
}
|
||||
|
||||
// Init initializes queue with structr.QueueConfig{}.
|
||||
func (q *StructQueue[T]) Init(config structr.QueueConfig[T]) {
|
||||
q.index = make(map[string]*structr.Index, len(config.Indices))
|
||||
q.queue = structr.Queue[T]{}
|
||||
q.queue.Init(config)
|
||||
for _, cfg := range config.Indices {
|
||||
q.index[cfg.Fields] = q.queue.Index(cfg.Fields)
|
||||
}
|
||||
}
|
||||
|
||||
// Pop: see structr.Queue{}.PopFront().
|
||||
func (q *StructQueue[T]) Pop() (value T, ok bool) {
|
||||
return q.queue.PopFront()
|
||||
}
|
||||
|
||||
// PopCtx wraps structr.Queue{}.PopFront() to add sleep until value is available.
|
||||
func (q *StructQueue[T]) PopCtx(ctx context.Context) (value T, ok bool) {
|
||||
for {
|
||||
// Try pop from front of queue.
|
||||
value, ok = q.queue.PopFront()
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
// Context canceled.
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
// Waiter released.
|
||||
case <-q.Wait():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push wraps structr.Queue{}.PushBack() to add sleeping pop goroutine awakening.
|
||||
func (q *StructQueue[T]) Push(values ...T) {
|
||||
q.queue.PushBack(values...)
|
||||
q.broadcast()
|
||||
}
|
||||
|
||||
// MoveBack ...
|
||||
func (q *StructQueue[T]) MoveBack(index string, key ...any) {
|
||||
i := q.index[index]
|
||||
q.queue.MoveBack(i, i.Key(key...))
|
||||
}
|
||||
|
||||
// Len: see structr.Queue{}.Len().
|
||||
func (q *StructQueue[T]) Len() int {
|
||||
return q.queue.Len()
|
||||
}
|
||||
|
||||
// Wait safely returns current (read-only) wait channel.
|
||||
func (q *StructQueue[T]) Wait() <-chan struct{} {
|
||||
var ch chan struct{}
|
||||
|
||||
for {
|
||||
// Get channel ptr.
|
||||
ptr := q.wait.Load()
|
||||
if ptr != nil {
|
||||
return *ptr
|
||||
}
|
||||
|
||||
if ch == nil {
|
||||
// Allocate new channel.
|
||||
ch = make(chan struct{})
|
||||
}
|
||||
|
||||
// Try set the new wait channel ptr.
|
||||
if q.wait.CompareAndSwap(ptr, &ch) {
|
||||
return ch
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// broadcast safely closes wait channel if
|
||||
// currently set, releasing waiting goroutines.
|
||||
func (q *StructQueue[T]) broadcast() {
|
||||
if ptr := q.wait.Swap(nil); ptr != nil {
|
||||
close(*ptr)
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import (
|
|||
"codeberg.org/gruf/go-mutexes"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/cache"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/db"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/queue"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/storage"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/timeline"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/workers"
|
||||
|
@ -36,6 +37,9 @@ type State struct {
|
|||
// Caches provides access to this state's collection of caches.
|
||||
Caches cache.Caches
|
||||
|
||||
// Queues provides access to this state's collection of queues.
|
||||
Queues queue.Queues
|
||||
|
||||
// Timelines provides access to this state's collection of timelines.
|
||||
Timelines timeline.Timelines
|
||||
|
||||
|
|
|
@ -23,7 +23,9 @@ import (
|
|||
"runtime"
|
||||
|
||||
"codeberg.org/gruf/go-runners"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/messages"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/queue"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
|
||||
)
|
||||
|
||||
|
@ -31,6 +33,9 @@ type Workers struct {
|
|||
// Main task scheduler instance.
|
||||
Scheduler scheduler.Scheduler
|
||||
|
||||
// Delivery ...
|
||||
Delivery httpclient.DeliveryWorkerPool
|
||||
|
||||
// ClientAPI provides a worker pool that handles both
|
||||
// incoming client actions, and our own side-effects.
|
||||
ClientAPI runners.WorkerPool
|
||||
|
@ -42,8 +47,9 @@ type Workers struct {
|
|||
// Enqueue functions for clientAPI / federator worker pools,
|
||||
// these are pointers to Processor{}.Enqueue___() msg functions.
|
||||
// This prevents dependency cycling as Processor depends on Workers.
|
||||
EnqueueClientAPI func(context.Context, ...messages.FromClientAPI)
|
||||
EnqueueFediAPI func(context.Context, ...messages.FromFediAPI)
|
||||
EnqueueHTTPClient func(context.Context, ...queue.HTTPRequest)
|
||||
EnqueueClientAPI func(context.Context, ...messages.FromClientAPI)
|
||||
EnqueueFediAPI func(context.Context, ...messages.FromFediAPI)
|
||||
|
||||
// Blocking processing functions for clientAPI / federator.
|
||||
// These are pointers to Processor{}.Process___() msg functions.
|
||||
|
@ -72,6 +78,8 @@ func (w *Workers) Start() {
|
|||
|
||||
tryUntil("starting scheduler", 5, w.Scheduler.Start)
|
||||
|
||||
tryUntil("start http client workerpool", 5, w.Delivery.Start)
|
||||
|
||||
tryUntil("starting client API workerpool", 5, func() bool {
|
||||
return w.ClientAPI.Start(4*maxprocs, 400*maxprocs)
|
||||
})
|
||||
|
@ -88,6 +96,7 @@ func (w *Workers) Start() {
|
|||
// Stop will stop all of the contained worker pools (and global scheduler).
|
||||
func (w *Workers) Stop() {
|
||||
tryUntil("stopping scheduler", 5, w.Scheduler.Stop)
|
||||
tryUntil("stopping http client workerpool", 5, w.Delivery.Stop)
|
||||
tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop)
|
||||
tryUntil("stopping federator workerpool", 5, w.Federator.Stop)
|
||||
tryUntil("stopping media workerpool", 5, w.Media.Stop)
|
||||
|
|
Loading…
Reference in a new issue