hook up queue deletion logic

This commit is contained in:
kim 2024-04-03 16:54:22 +01:00
parent 8aede54741
commit 8fdd5b18aa
9 changed files with 84 additions and 59 deletions

View file

@ -128,7 +128,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
// Initialize the queues.
state.Queues.Init()
state.Workers.HTTPClient.Init(client, &state.Queues.HTTPRequest, runtime.GOMAXPROCS(0))
state.Workers.APDelivery.Init(client, &state.Queues.APRequests, runtime.GOMAXPROCS(0))
// Initialize workers.
state.Workers.Start()

View file

@ -51,7 +51,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
// in a delete we only get the URI, we can't know if we have a status or a profile or something else,
// so we have to try a few different things...
if s, err := f.state.DB.GetStatusByURI(ctx, id.String()); err == nil && requestingAcct.ID == s.AccountID {
l.Debugf("uri is for STATUS with id: %s", s.ID)
l.Debugf("deleting status: %s", s.ID)
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityDelete,
@ -61,7 +61,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
}
if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID {
l.Debugf("uri is for ACCOUNT with id %s", a.ID)
l.Debugf("deleting account: %s", a.ID)
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityDelete,

View file

@ -27,33 +27,31 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/queue"
)
// DeliveryWorkerPool ...
type DeliveryWorkerPool struct {
client *Client
queue *queue.StructQueue[*queue.HTTPRequest]
workers []DeliveryWorker
// APDeliveryWorkerPool ...
type APDeliveryWorkerPool struct {
workers []APDeliveryWorker
}
// Init will initialize the DeliveryWorker{} pool
// with given http client, request queue to pull
// from and number of delivery workers to spawn.
func (p *DeliveryWorkerPool) Init(
func (p *APDeliveryWorkerPool) Init(
client *Client,
queue *queue.StructQueue[*queue.HTTPRequest],
queue *queue.StructQueue[*queue.APRequest],
workers int,
) {
p.workers = make([]DeliveryWorker, workers)
p.workers = make([]APDeliveryWorker, workers)
for i := range p.workers {
p.workers[i] = NewDeliveryWorker(
p.client,
p.queue,
p.workers[i] = NewAPDeliveryWorker(
client,
queue,
)
}
}
// Start will attempt to start all of the contained DeliveryWorker{}s.
// NOTE: this is not safe to call concurrently with .Init().
func (p *DeliveryWorkerPool) Start() bool {
func (p *APDeliveryWorkerPool) Start() bool {
if len(p.workers) == 0 {
return false
}
@ -66,7 +64,7 @@ func (p *DeliveryWorkerPool) Start() bool {
// Stop will attempt to stop all of the contained DeliveryWorker{}s.
// NOTE: this is not safe to call concurrently with .Init().
func (p *DeliveryWorkerPool) Stop() bool {
func (p *APDeliveryWorkerPool) Stop() bool {
if len(p.workers) == 0 {
return false
}
@ -77,17 +75,17 @@ func (p *DeliveryWorkerPool) Stop() bool {
return ok
}
// DeliveryWorker ...
type DeliveryWorker struct {
// APDeliveryWorker ...
type APDeliveryWorker struct {
client *Client
queue *queue.StructQueue[*queue.HTTPRequest]
queue *queue.StructQueue[*queue.APRequest]
backlog []*delivery
service runners.Service
}
// NewDeliveryWorker returns a new DeliveryWorker that feeds from queue, using given HTTP client.
func NewDeliveryWorker(client *Client, queue *queue.StructQueue[*queue.HTTPRequest]) DeliveryWorker {
return DeliveryWorker{
// NewAPDeliveryWorker returns a new APDeliveryWorker that feeds from queue, using given HTTP client.
func NewAPDeliveryWorker(client *Client, queue *queue.StructQueue[*queue.APRequest]) APDeliveryWorker {
return APDeliveryWorker{
client: client,
queue: queue,
backlog: make([]*delivery, 0, 256),
@ -95,17 +93,17 @@ func NewDeliveryWorker(client *Client, queue *queue.StructQueue[*queue.HTTPReque
}
// Start will attempt to start the DeliveryWorker{}.
func (w *DeliveryWorker) Start() bool {
func (w *APDeliveryWorker) Start() bool {
return w.service.Run(w.process)
}
// Stop will attempt to stop the DeliveryWorker{}.
func (w *DeliveryWorker) Stop() bool {
func (w *APDeliveryWorker) Stop() bool {
return w.service.Stop()
}
// process is the main delivery worker processing routine.
func (w *DeliveryWorker) process(ctx context.Context) {
func (w *APDeliveryWorker) process(ctx context.Context) {
if w.client == nil || w.queue == nil {
panic("nil delivery worker fields")
}
@ -165,7 +163,7 @@ loop:
}
// next gets the next available delivery, blocking until available if necessary.
func (w *DeliveryWorker) next(ctx context.Context) (*delivery, bool) {
func (w *APDeliveryWorker) next(ctx context.Context) (*delivery, bool) {
// Try pop next queued.
msg, ok := w.queue.Pop()
@ -195,7 +193,7 @@ func (w *DeliveryWorker) next(ctx context.Context) (*delivery, bool) {
}
// popBacklog pops next available from the backlog.
func (w *DeliveryWorker) popBacklog() *delivery {
func (w *APDeliveryWorker) popBacklog() *delivery {
if len(w.backlog) == 0 {
return nil
}
@ -211,7 +209,7 @@ func (w *DeliveryWorker) popBacklog() *delivery {
}
// pushBacklog pushes the given delivery to backlog.
func (w *DeliveryWorker) pushBacklog(dlv *delivery) {
func (w *APDeliveryWorker) pushBacklog(dlv *delivery) {
w.backlog = append(w.backlog, dlv)
}
@ -240,8 +238,8 @@ func (d *delivery) BackOff() time.Duration {
return time.Now().Sub(d.next)
}
// wrapMsg wraps a received queued HTTP request message in our delivery type.
func wrapMsg(ctx context.Context, msg *queue.HTTPRequest) *delivery {
// wrapMsg wraps a received queued AP request message in our delivery type.
func wrapMsg(ctx context.Context, msg *queue.APRequest) *delivery {
dlv := new(delivery)
dlv.request = wrapRequest(msg.Request)
dlv.log = requestLog(dlv.req)

View file

@ -75,6 +75,11 @@ func (f *federate) DeleteAccount(ctx context.Context, account *gtsmodel.Account)
return nil
}
// Drop any queued outgoing AP requests to / from account,
// (this stops any queued likes, boosts, creates etc).
f.state.Queues.APRequests.Delete("ActorID", account.URI)
f.state.Queues.APRequests.Delete("ObjectID", account.URI)
// Parse relevant URI(s).
outboxIRI, err := parseURI(account.OutboxURI)
if err != nil {
@ -222,6 +227,10 @@ func (f *federate) DeleteStatus(ctx context.Context, status *gtsmodel.Status) er
return nil
}
// Drop any queued outgoing http requests for status,
// (this stops any queued likes, boosts, creates etc).
f.state.Queues.APRequests.Delete("ObjectID", status.URI)
// Ensure the status model is fully populated.
if err := f.state.DB.PopulateStatus(ctx, status); err != nil {
return gtserror.Newf("error populating status: %w", err)

View file

@ -55,7 +55,10 @@ import (
// Receiving *gtsmodel.Account
// }
type HTTPRequest struct {
type APRequest struct {
// ActorID ...
ActorID string
// ObjectID ...
ObjectID string

View file

@ -24,8 +24,8 @@ import (
// Queues ...
type Queues struct {
// HTTPRequest ...
HTTPRequest StructQueue[*HTTPRequest]
// APRequests ...
APRequests StructQueue[*APRequest]
}
// Init will re(initialize) queues. NOTE: the queue
@ -37,7 +37,7 @@ func (q *Queues) Init() {
}
func (q *Queues) initHTTPRequest() {
q.HTTPRequest.Init(structr.QueueConfig[*HTTPRequest]{
q.APRequests.Init(structr.QueueConfig[*APRequest]{
Indices: []structr.IndexConfig{
{Fields: "ObjectID", Multiple: true},
{Fields: "Request.URL.Host", Multiple: true},

View file

@ -72,10 +72,10 @@ func (q *StructQueue[T]) Push(values ...T) {
q.broadcast()
}
// MoveBack ...
func (q *StructQueue[T]) MoveBack(index string, key ...any) {
// Delete removes all queued entries under index with key.
func (q *StructQueue[T]) Delete(index string, key ...any) {
i := q.index[index]
q.queue.MoveBack(i, i.Key(key...))
q.queue.Pop(i, i.Key(key...))
}
// Len: see structr.Queue{}.Len().

View file

@ -34,7 +34,7 @@ import (
func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error {
var (
// accumulated prepared reqs.
reqs []*queue.HTTPRequest
reqs []*queue.APRequest
// accumulated preparation errs.
errs gtserror.MultiError
@ -50,8 +50,9 @@ func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}
return gtserror.Newf("error marshaling json: %w", err)
}
// Extract object ID.
id := getObjectID(obj)
// Extract object IDs.
objID := getObjectID(obj)
actID := getActorID(obj)
for _, to := range recipients {
// Skip delivery to recipient if it is "us".
@ -59,8 +60,8 @@ func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}
continue
}
// Prepare new http client request.
req, err := t.prepare(ctx, id, b, to)
// Prepare new outgoing http client request.
req, err := t.prepare(ctx, objID, actID, b, to)
if err != nil {
errs.Append(err)
continue
@ -71,7 +72,7 @@ func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}
}
// Push the request list to HTTP client worker queue.
t.controller.state.Queues.HTTPRequest.Push(reqs...)
t.controller.state.Queues.APRequests.Push(reqs...)
// Return combined err.
return errs.Combine()
@ -89,17 +90,19 @@ func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to
return gtserror.Newf("error marshaling json: %w", err)
}
// Extract object ID.
id := getObjectID(obj)
// Prepare new http client request.
req, err := t.prepare(ctx, id, b, to)
// Prepare http client request.
req, err := t.prepare(ctx,
getObjectID(obj),
getActorID(obj),
b,
to,
)
if err != nil {
return err
}
// Push the request to HTTP client worker queue.
t.controller.state.Queues.HTTPRequest.Push(req)
t.controller.state.Queues.APRequests.Push(req)
return nil
}
@ -110,10 +113,11 @@ func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to
func (t *transport) prepare(
ctx context.Context,
objectID string,
actorID string,
data []byte,
to *url.URL,
) (
*queue.HTTPRequest,
*queue.APRequest,
error,
) {
url := to.String()
@ -137,7 +141,7 @@ func (t *transport) prepare(
req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
req.Header.Add("Accept-Charset", "utf-8")
return &queue.HTTPRequest{
return &queue.APRequest{
ObjectID: objectID,
Request: req,
}, nil
@ -155,3 +159,16 @@ func getObjectID(obj map[string]interface{}) string {
return ""
}
}
// getActorID extracts an actor ID from 'serialized' ActivityPub object map.
func getActorID(obj map[string]interface{}) string {
switch t := obj["actor"].(type) {
case string:
return t
case map[string]interface{}:
id, _ := t["id"].(string)
return id
default:
return ""
}
}

View file

@ -25,7 +25,6 @@ import (
"codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/queue"
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
)
@ -33,8 +32,8 @@ type Workers struct {
// Main task scheduler instance.
Scheduler scheduler.Scheduler
// HTTPClient ...
HTTPClient httpclient.DeliveryWorkerPool
// APDelivery ...
APDelivery httpclient.APDeliveryWorkerPool
// ClientAPI provides a worker pool that handles both
// incoming client actions, and our own side-effects.
@ -47,9 +46,8 @@ type Workers struct {
// Enqueue functions for clientAPI / federator worker pools,
// these are pointers to Processor{}.Enqueue___() msg functions.
// This prevents dependency cycling as Processor depends on Workers.
EnqueueHTTPClient func(context.Context, ...queue.HTTPRequest)
EnqueueClientAPI func(context.Context, ...messages.FromClientAPI)
EnqueueFediAPI func(context.Context, ...messages.FromFediAPI)
EnqueueClientAPI func(context.Context, ...messages.FromClientAPI)
EnqueueFediAPI func(context.Context, ...messages.FromFediAPI)
// Blocking processing functions for clientAPI / federator.
// These are pointers to Processor{}.Process___() msg functions.
@ -78,7 +76,7 @@ func (w *Workers) Start() {
tryUntil("starting scheduler", 5, w.Scheduler.Start)
tryUntil("start http client workerpool", 5, w.HTTPClient.Start)
tryUntil("start ap delivery workerpool", 5, w.APDelivery.Start)
tryUntil("starting client API workerpool", 5, func() bool {
return w.ClientAPI.Start(4*maxprocs, 400*maxprocs)
@ -96,7 +94,7 @@ func (w *Workers) Start() {
// Stop will stop all of the contained worker pools (and global scheduler).
func (w *Workers) Stop() {
tryUntil("stopping scheduler", 5, w.Scheduler.Stop)
tryUntil("stopping http client workerpool", 5, w.HTTPClient.Stop)
tryUntil("stopping ap delivery workerpool", 5, w.APDelivery.Stop)
tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop)
tryUntil("stopping federator workerpool", 5, w.Federator.Stop)
tryUntil("stopping media workerpool", 5, w.Media.Stop)