From 4729c92dadef4f166065755de6b65ce111516b42 Mon Sep 17 00:00:00 2001 From: kim Date: Wed, 24 Apr 2024 12:32:53 +0100 Subject: [PATCH] refactor federatingDB.Delete(), drop queued messages when deleting account / status --- internal/federation/federatingdb/delete.go | 161 +++++++++++++++++---- internal/messages/messages.go | 50 ++++--- internal/processing/workers/federate.go | 30 ++-- 3 files changed, 184 insertions(+), 57 deletions(-) diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go index b1a955504..8bfaa325a 100644 --- a/internal/federation/federatingdb/delete.go +++ b/internal/federation/federatingdb/delete.go @@ -19,10 +19,13 @@ package federatingdb import ( "context" + "errors" "net/url" - "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" ) @@ -34,43 +37,149 @@ import ( // // The library makes this call only after acquiring a lock first. func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"id", id}, - }...) - l.Debug("entering Delete") - activityContext := getActivityContext(ctx) if activityContext.internal { return nil // Already processed. } - requestingAcct := activityContext.requestingAcct - receivingAcct := activityContext.receivingAcct + // Extract receiving / requesting accounts. + requesting := activityContext.requestingAcct + receiving := activityContext.receivingAcct - // in a delete we only get the URI, we can't know if we have a status or a profile or something else, - // so we have to try a few different things... - if s, err := f.state.DB.GetStatusByURI(ctx, id.String()); err == nil && requestingAcct.ID == s.AccountID { - l.Debugf("deleting status: %s", s.ID) - f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityDelete, - GTSModel: s, - Receiving: receivingAcct, - Requesting: requestingAcct, - }) + // Serialize ID URI. + uriStr := id.String() + + var ( + ok bool + err error + ) + + // Attempt to delete account. + ok, err = f.deleteAccount(ctx, + requesting, + receiving, + uriStr, + ) + if err != nil || ok { // handles success + return err } - if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID { - l.Debugf("deleting account: %s", a.ID) + // Attempt to delete status. + ok, err = f.deleteStatus(ctx, + requesting, + receiving, + uriStr, + ) + if err != nil || ok { // handles success + return err + } + + // Log at warning level, as lots of these could indicate federation + // issues between remote and this instance, or help with debugging. + log.Warnf(ctx, "received delete for unknown target: %s", uriStr) + return nil +} + +func (f *federatingDB) deleteAccount( + ctx context.Context, + requesting *gtsmodel.Account, + receiving *gtsmodel.Account, + uri string, // target account +) ( + bool, // success? + error, // any error +) { + account, err := f.state.DB.GetAccountByURI(ctx, uri) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return false, gtserror.Newf("error getting account: %w", err) + } + + if account != nil { + if account.ID != requesting.ID { + const text = "signing account does not match delete target" + return false, gtserror.NewErrorForbidden(err, text) + } + + log.Debugf(ctx, "deleting account: %s", account.URI) + + // Drop any outgoing queued AP requests to / from / targeting + // this account, (stops queued likes, boosts, creates etc). + f.state.Workers.Delivery.Queue.Delete("ObjectID", account.URI) + f.state.Workers.Delivery.Queue.Delete("TargetID", account.URI) + + // Drop any incoming queued client messages to / from this + // account, (stops processing of local origin data for acccount). + f.state.Workers.Client.Queue.Delete("Target.ID", account.ID) + f.state.Workers.Client.Queue.Delete("TargetURI", account.URI) + + // Drop any incoming queued federator messages to this account, + // (stops processing of remote origin data targeting this account). + f.state.Workers.Federator.Queue.Delete("Requesting.ID", account.ID) + f.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) + + // Only AFTER we have finished purging queues do we enqueue, + // otherwise we risk purging our own delete message from queue! f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ APObjectType: ap.ObjectProfile, APActivityType: ap.ActivityDelete, - GTSModel: a, - Receiving: receivingAcct, - Requesting: requestingAcct, + GTSModel: account, + Receiving: receiving, + Requesting: requesting, }) + + return true, nil } - return nil + return false, nil +} + +func (f *federatingDB) deleteStatus( + ctx context.Context, + requesting *gtsmodel.Account, + receiving *gtsmodel.Account, + uri string, // target status +) ( + bool, // success? + error, // any error +) { + status, err := f.state.DB.GetStatusByURI(ctx, uri) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return false, gtserror.Newf("error getting status: %w", err) + } + + if status != nil { + if status.AccountID != requesting.ID { + const text = "signing account does not match delete target owner" + return false, gtserror.NewErrorForbidden(err, text) + } + + log.Debugf(ctx, "deleting status: %s", status.URI) + + // Drop any outgoing queued AP requests about / targeting + // this status, (stops queued likes, boosts, creates etc). + f.state.Workers.Delivery.Queue.Delete("ObjectID", status.URI) + f.state.Workers.Delivery.Queue.Delete("TargetID", status.URI) + + // Drop any incoming queued client messages about / targeting + // status, (stops processing of local origin data for status). + f.state.Workers.Client.Queue.Delete("TargetURI", status.URI) + + // Drop any incoming queued federator messages targeting status, + // (stops processing of remote origin data targeting this status). + f.state.Workers.Federator.Queue.Delete("TargetURI", status.URI) + + // Only AFTER we have finished purging queues do we enqueue, + // otherwise we risk purging our own delete message from queue! + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityDelete, + GTSModel: status, + Receiving: receiving, + Requesting: requesting, + }) + + return true, nil + } + + return false, nil } diff --git a/internal/messages/messages.go b/internal/messages/messages.go index ebf6faa0f..c5488d586 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -34,17 +34,32 @@ type FromClientAPI struct { // APActivityType ... APActivityType string - // Optional GTS model of - // the Activity or Object. + // Optional GTS database model + // of the Activity / Object. GTSModel interface{} - // Origin ... + // Targeted object URI. + TargetURI string + + // Origin is the account that + // this message originated from. Origin *gtsmodel.Account - // Target ... + // Target is the account that + // this message is targeting. Target *gtsmodel.Account } +// ClientMsgIndices defines queue indices this +// message type should be accessible / stored under. +func ClientMsgIndices() []structr.IndexConfig { + return []structr.IndexConfig{ + {Fields: "TargetURI", Multiple: true}, + {Fields: "Origin.ID", Multiple: true}, + {Fields: "Target.ID", Multiple: true}, + } +} + // FromFediAPI wraps a message that // travels from the federating API into the processor. type FromFediAPI struct { @@ -55,17 +70,18 @@ type FromFediAPI struct { // APActivityType ... APActivityType string - // APIRI ... - APIRI *url.URL - - // Optional AP model of the Object of the - // Activity. Likely Accountable or Statusable. + // Optional ActivityPub ID (IRI) + // and / or model of Activity / Object. + APIRI *url.URL APObject interface{} - // Optional GTS model of - // the Activity or Object. + // Optional GTS database model + // of the Activity / Object. GTSModel interface{} + // Targeted object URI. + TargetURI string + // Remote account that posted // this Activity to the inbox. Requesting *gtsmodel.Account @@ -75,18 +91,12 @@ type FromFediAPI struct { Receiving *gtsmodel.Account } -// ClientMsgIndices ... -func ClientMsgIndices() []structr.IndexConfig { - return []structr.IndexConfig{ - {Fields: "Origin.ID", Multiple: true}, - {Fields: "Target.ID", Multiple: true}, - } -} - -// FederatorMsgIndices ... +// FederatorMsgIndices defines queue indices this +// message type should be accessible / stored under. func FederatorMsgIndices() []structr.IndexConfig { return []structr.IndexConfig{ {Fields: "APIRI", Multiple: true}, + {Fields: "TargetURI", Multiple: true}, {Fields: "Requesting.ID", Multiple: true}, {Fields: "Receiving.ID", Multiple: true}, } diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go index 33dcd3780..d560ee1e3 100644 --- a/internal/processing/workers/federate.go +++ b/internal/processing/workers/federate.go @@ -96,16 +96,22 @@ func (f *federate) DeleteAccount(ctx context.Context, account *gtsmodel.Account) return err } - // Drop any queued outgoing AP requests to / from account, - // and drop any client /federator API messages for account. - // (this stops any queued likes, boosts, creates etc). + // Drop any outgoing queued AP requests to / from / targeting + // this account, (stops queued likes, boosts, creates etc). f.state.Workers.Delivery.Queue.Delete("ActorID", account.URI) f.state.Workers.Delivery.Queue.Delete("ObjectID", account.URI) f.state.Workers.Delivery.Queue.Delete("TargetID", account.URI) + + // Drop any incoming queued client messages to / from this + // account, (stops processing of local origin data for acccount). f.state.Workers.Client.Queue.Delete("Origin.ID", account.ID) f.state.Workers.Client.Queue.Delete("Target.ID", account.ID) - f.state.Workers.Federator.Queue.Delete("APIRI", actorIRI) + f.state.Workers.Client.Queue.Delete("TargetURI", account.URI) + + // Drop any incoming queued federator messages to this account, + // (stops processing of remote origin data targeting this account). f.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID) + f.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) // Create a new delete. // todo: tc.AccountToASDelete @@ -239,16 +245,18 @@ func (f *federate) DeleteStatus(ctx context.Context, status *gtsmodel.Status) er return err } - // Drop any queued outgoing http requests for status, - // and drop any client /federator API messages for status. - // (this stops any queued likes, boosts, creates etc). + // Drop any outgoing queued AP requests about / targeting + // this status, (stops queued likes, boosts, creates etc). f.state.Workers.Delivery.Queue.Delete("ObjectID", status.URI) f.state.Workers.Delivery.Queue.Delete("TargetID", status.URI) - f.state.Workers.Client.Queue.Delete("Origin.ID", account.ID) - f.state.Workers.Client.Queue.Delete("Target.ID", account.ID) - f.state.Workers.Federator.Queue.Delete("APIRI", actorIRI) - f.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID) + // Drop any incoming queued client messages about / targeting + // status, (stops processing of local origin data for status). + f.state.Workers.Client.Queue.Delete("TargetURI", status.URI) + + // Drop any incoming queued federator messages targeting status, + // (stops processing of remote origin data targeting this status). + f.state.Workers.Federator.Queue.Delete("TargetURI", status.URI) // Ensure the status model is fully populated. if err := f.state.DB.PopulateStatus(ctx, status); err != nil {