From 4ee436e98a3351d9568c4a018bd2de34c218e9a6 Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:57:25 +0100 Subject: [PATCH] [bugfix] process account delete side effects in serial, not in parallel (#2360) * [bugfix] process account delete side effects in serial, not in parallel * StartWorkers / StartNoopWorkers for tests * undo testrig trace logging * log errors instead of immediately returning --- .../api/activitypub/emoji/emojiget_test.go | 2 +- internal/api/activitypub/users/user_test.go | 4 +- internal/api/auth/auth_test.go | 2 +- internal/api/client/accounts/account_test.go | 2 +- internal/api/client/admin/admin_test.go | 2 +- .../api/client/bookmarks/bookmarks_test.go | 2 +- .../api/client/favourites/favourites_test.go | 2 +- .../followrequests/followrequest_test.go | 2 +- internal/api/client/instance/instance_test.go | 2 +- internal/api/client/lists/lists_test.go | 2 +- internal/api/client/media/mediacreate_test.go | 2 +- internal/api/client/media/mediaupdate_test.go | 2 +- internal/api/client/polls/polls_test.go | 2 +- internal/api/client/reports/reports_test.go | 2 +- internal/api/client/search/search_test.go | 2 +- internal/api/client/statuses/status_test.go | 2 +- .../api/client/streaming/streaming_test.go | 2 +- internal/api/client/user/user_test.go | 2 +- internal/api/fileserver/fileserver_test.go | 4 +- .../api/wellknown/webfinger/webfinger_test.go | 2 +- .../wellknown/webfinger/webfingerget_test.go | 4 +- internal/cleaner/cleaner_test.go | 2 +- internal/cleaner/media_test.go | 2 +- internal/federation/dereferencing/account.go | 8 +- .../dereferencing/dereferencer_test.go | 2 +- .../federatingdb/federatingdb_test.go | 2 +- internal/federation/federator_test.go | 2 +- internal/media/media_test.go | 2 +- internal/processing/account/account_test.go | 2 +- internal/processing/account/delete.go | 162 ++++++++++++------ internal/processing/admin/admin_test.go | 4 +- internal/processing/polls/poll_test.go | 2 +- internal/processing/processor_test.go | 2 +- internal/processing/status/status_test.go | 2 +- internal/processing/workers/workers_test.go | 3 +- internal/timeline/timeline_test.go | 2 +- internal/transport/transport_test.go | 2 +- internal/typeutils/converter_test.go | 7 +- testrig/mediahandler.go | 2 +- testrig/processor.go | 6 +- testrig/util.go | 19 +- 41 files changed, 181 insertions(+), 102 deletions(-) diff --git a/internal/api/activitypub/emoji/emojiget_test.go b/internal/api/activitypub/emoji/emojiget_test.go index 993149784..57c412928 100644 --- a/internal/api/activitypub/emoji/emojiget_test.go +++ b/internal/api/activitypub/emoji/emojiget_test.go @@ -66,7 +66,7 @@ func (suite *EmojiGetTestSuite) SetupSuite() { func (suite *EmojiGetTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/activitypub/users/user_test.go b/internal/api/activitypub/users/user_test.go index 611b9033a..a52c351ac 100644 --- a/internal/api/activitypub/users/user_test.go +++ b/internal/api/activitypub/users/user_test.go @@ -78,7 +78,6 @@ func (suite *UserStandardTestSuite) SetupSuite() { func (suite *UserStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() @@ -98,7 +97,10 @@ func (suite *UserStandardTestSuite) SetupTest() { suite.mediaManager = testrig.NewTestMediaManager(&suite.state) suite.federator = testrig.NewTestFederator(&suite.state, testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../../testrig/media")), suite.mediaManager) suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil) + suite.processor = testrig.NewTestProcessor(&suite.state, suite.federator, suite.emailSender, suite.mediaManager) + testrig.StartWorkers(&suite.state, suite.processor.Workers()) + suite.userModule = users.New(suite.processor) testrig.StandardDBSetup(suite.db, suite.testAccounts) testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media") diff --git a/internal/api/auth/auth_test.go b/internal/api/auth/auth_test.go index ddf75970e..d77b1a3d4 100644 --- a/internal/api/auth/auth_test.go +++ b/internal/api/auth/auth_test.go @@ -93,7 +93,7 @@ func (suite *AuthStandardTestSuite) SetupTest() { suite.authModule = auth.New(suite.db, suite.processor, suite.idp) testrig.StandardDBSetup(suite.db, suite.testAccounts) - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) } func (suite *AuthStandardTestSuite) TearDownTest() { diff --git a/internal/api/client/accounts/account_test.go b/internal/api/client/accounts/account_test.go index ed4d48027..e247ba21d 100644 --- a/internal/api/client/accounts/account_test.go +++ b/internal/api/client/accounts/account_test.go @@ -78,7 +78,7 @@ func (suite *AccountStandardTestSuite) SetupSuite() { func (suite *AccountStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/admin/admin_test.go b/internal/api/client/admin/admin_test.go index 53b7b3552..13c6d13e9 100644 --- a/internal/api/client/admin/admin_test.go +++ b/internal/api/client/admin/admin_test.go @@ -84,7 +84,7 @@ func (suite *AdminStandardTestSuite) SetupSuite() { func (suite *AdminStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/bookmarks/bookmarks_test.go b/internal/api/client/bookmarks/bookmarks_test.go index b10cf914c..b689ca24c 100644 --- a/internal/api/client/bookmarks/bookmarks_test.go +++ b/internal/api/client/bookmarks/bookmarks_test.go @@ -88,7 +88,7 @@ func (suite *BookmarkTestSuite) SetupSuite() { func (suite *BookmarkTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/favourites/favourites_test.go b/internal/api/client/favourites/favourites_test.go index c9776225f..2826b2c4e 100644 --- a/internal/api/client/favourites/favourites_test.go +++ b/internal/api/client/favourites/favourites_test.go @@ -72,7 +72,7 @@ func (suite *FavouritesStandardTestSuite) SetupSuite() { func (suite *FavouritesStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/followrequests/followrequest_test.go b/internal/api/client/followrequests/followrequest_test.go index 5268d259e..3ef8951b9 100644 --- a/internal/api/client/followrequests/followrequest_test.go +++ b/internal/api/client/followrequests/followrequest_test.go @@ -75,7 +75,7 @@ func (suite *FollowRequestStandardTestSuite) SetupSuite() { func (suite *FollowRequestStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/instance/instance_test.go b/internal/api/client/instance/instance_test.go index 0189d87bf..42bd9eeb3 100644 --- a/internal/api/client/instance/instance_test.go +++ b/internal/api/client/instance/instance_test.go @@ -77,7 +77,7 @@ func (suite *InstanceStandardTestSuite) SetupSuite() { func (suite *InstanceStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/lists/lists_test.go b/internal/api/client/lists/lists_test.go index ac6f79f11..e7e56183e 100644 --- a/internal/api/client/lists/lists_test.go +++ b/internal/api/client/lists/lists_test.go @@ -76,7 +76,7 @@ func (suite *ListsStandardTestSuite) SetupSuite() { func (suite *ListsStandardTestSuite) SetupTest() { suite.state.Caches.Init() suite.state.Caches.Start() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/media/mediacreate_test.go b/internal/api/client/media/mediacreate_test.go index 928d70379..a2cf9417a 100644 --- a/internal/api/client/media/mediacreate_test.go +++ b/internal/api/client/media/mediacreate_test.go @@ -80,7 +80,7 @@ type MediaCreateTestSuite struct { func (suite *MediaCreateTestSuite) SetupSuite() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) // setup standard items testrig.InitTestConfig() diff --git a/internal/api/client/media/mediaupdate_test.go b/internal/api/client/media/mediaupdate_test.go index a0ff1b96d..abc26247c 100644 --- a/internal/api/client/media/mediaupdate_test.go +++ b/internal/api/client/media/mediaupdate_test.go @@ -76,7 +76,7 @@ type MediaUpdateTestSuite struct { */ func (suite *MediaUpdateTestSuite) SetupSuite() { - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) // setup standard items testrig.InitTestConfig() diff --git a/internal/api/client/polls/polls_test.go b/internal/api/client/polls/polls_test.go index 5baa29158..acf972e2a 100644 --- a/internal/api/client/polls/polls_test.go +++ b/internal/api/client/polls/polls_test.go @@ -69,7 +69,7 @@ func (suite *PollsStandardTestSuite) SetupSuite() { func (suite *PollsStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/reports/reports_test.go b/internal/api/client/reports/reports_test.go index 95833da20..3e21ef1ba 100644 --- a/internal/api/client/reports/reports_test.go +++ b/internal/api/client/reports/reports_test.go @@ -69,7 +69,7 @@ func (suite *ReportsStandardTestSuite) SetupSuite() { func (suite *ReportsStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/search/search_test.go b/internal/api/client/search/search_test.go index 7b9155a83..653858a1c 100644 --- a/internal/api/client/search/search_test.go +++ b/internal/api/client/search/search_test.go @@ -73,7 +73,7 @@ func (suite *SearchStandardTestSuite) SetupSuite() { func (suite *SearchStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/statuses/status_test.go b/internal/api/client/statuses/status_test.go index 96f7a633d..921afb3b5 100644 --- a/internal/api/client/statuses/status_test.go +++ b/internal/api/client/statuses/status_test.go @@ -72,7 +72,7 @@ func (suite *StatusStandardTestSuite) SetupSuite() { func (suite *StatusStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/streaming/streaming_test.go b/internal/api/client/streaming/streaming_test.go index 1bd0bea78..30574080e 100644 --- a/internal/api/client/streaming/streaming_test.go +++ b/internal/api/client/streaming/streaming_test.go @@ -84,7 +84,7 @@ func (suite *StreamingTestSuite) SetupSuite() { func (suite *StreamingTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/client/user/user_test.go b/internal/api/client/user/user_test.go index ccf538ac2..7cad3975d 100644 --- a/internal/api/client/user/user_test.go +++ b/internal/api/client/user/user_test.go @@ -57,7 +57,7 @@ type UserStandardTestSuite struct { func (suite *UserStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/api/fileserver/fileserver_test.go b/internal/api/fileserver/fileserver_test.go index 9f6cb578f..120ebacb5 100644 --- a/internal/api/fileserver/fileserver_test.go +++ b/internal/api/fileserver/fileserver_test.go @@ -65,7 +65,7 @@ type FileserverTestSuite struct { */ func (suite *FileserverTestSuite) SetupSuite() { - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() @@ -96,7 +96,7 @@ func (suite *FileserverTestSuite) SetupSuite() { func (suite *FileserverTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.StandardDBSetup(suite.db, nil) testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") diff --git a/internal/api/wellknown/webfinger/webfinger_test.go b/internal/api/wellknown/webfinger/webfinger_test.go index e0da90db4..4c4aba205 100644 --- a/internal/api/wellknown/webfinger/webfinger_test.go +++ b/internal/api/wellknown/webfinger/webfinger_test.go @@ -72,7 +72,7 @@ func (suite *WebfingerStandardTestSuite) SetupSuite() { func (suite *WebfingerStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestLog() testrig.InitTestConfig() diff --git a/internal/api/wellknown/webfinger/webfingerget_test.go b/internal/api/wellknown/webfinger/webfingerget_test.go index 0fefe028c..c9bd088be 100644 --- a/internal/api/wellknown/webfinger/webfingerget_test.go +++ b/internal/api/wellknown/webfinger/webfingerget_test.go @@ -84,10 +84,10 @@ func (suite *WebfingerGetTestSuite) funkifyAccountDomain(host string, accountDom config.SetHost(host) config.SetAccountDomain(accountDomain) testrig.StopWorkers(&suite.state) - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) suite.processor = processing.NewProcessor(cleaner.New(&suite.state), suite.tc, suite.federator, testrig.NewTestOauthServer(suite.db), testrig.NewTestMediaManager(&suite.state), &suite.state, suite.emailSender) suite.webfingerModule = webfinger.New(suite.processor) - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) // Generate a new account for the // tester, which uses the new host. diff --git a/internal/cleaner/cleaner_test.go b/internal/cleaner/cleaner_test.go index 4524e9609..42006611b 100644 --- a/internal/cleaner/cleaner_test.go +++ b/internal/cleaner/cleaner_test.go @@ -58,7 +58,7 @@ func (suite *CleanerTestSuite) SetupTest() { suite.state.Storage = testrig.NewInMemoryStorage() // Initialize test cleaner instance. - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) suite.cleaner = cleaner.New(&suite.state) // Allocate new test model emojis. diff --git a/internal/cleaner/media_test.go b/internal/cleaner/media_test.go index c1226bcb5..a2fdb0858 100644 --- a/internal/cleaner/media_test.go +++ b/internal/cleaner/media_test.go @@ -62,7 +62,7 @@ func (suite *MediaTestSuite) SetupTest() { testrig.InitTestLog() suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) suite.db = testrig.NewTestDB(&suite.state) suite.storage = testrig.NewInMemoryStorage() diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index d551c3f0b..19d53895a 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -317,11 +317,9 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri if apubAcc != nil { // This account was updated, enqueue re-dereference featured posts. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { - log.Errorf(ctx, "error fetching account featured collection: %v", err) - } - }) + if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { + log.Errorf(ctx, "error fetching account featured collection: %v", err) + } } }) } diff --git a/internal/federation/dereferencing/dereferencer_test.go b/internal/federation/dereferencing/dereferencer_test.go index 1fefc1db9..517479a50 100644 --- a/internal/federation/dereferencing/dereferencer_test.go +++ b/internal/federation/dereferencing/dereferencer_test.go @@ -60,7 +60,7 @@ func (suite *DereferencerStandardTestSuite) SetupTest() { suite.testEmojis = testrig.NewTestEmojis() suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) suite.db = testrig.NewTestDB(&suite.state) diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go index 5c1428ee1..54c724057 100644 --- a/internal/federation/federatingdb/federatingdb_test.go +++ b/internal/federation/federatingdb/federatingdb_test.go @@ -67,7 +67,7 @@ func (suite *FederatingDBTestSuite) SetupTest() { testrig.InitTestLog() suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) suite.fromFederator = make(chan messages.FromFediAPI, 10) suite.state.Workers.EnqueueFediAPI = func(ctx context.Context, msgs ...messages.FromFediAPI) { diff --git a/internal/federation/federator_test.go b/internal/federation/federator_test.go index bfc25fc74..172675bd2 100644 --- a/internal/federation/federator_test.go +++ b/internal/federation/federator_test.go @@ -56,7 +56,7 @@ func (suite *FederatorStandardTestSuite) SetupSuite() { func (suite *FederatorStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/media/media_test.go b/internal/media/media_test.go index 99c2da566..a719116d1 100644 --- a/internal/media/media_test.go +++ b/internal/media/media_test.go @@ -48,7 +48,7 @@ func (suite *MediaStandardTestSuite) SetupTest() { testrig.InitTestLog() suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) suite.db = testrig.NewTestDB(&suite.state) suite.storage = testrig.NewInMemoryStorage() diff --git a/internal/processing/account/account_test.go b/internal/processing/account/account_test.go index 7f259815a..d43b4c937 100644 --- a/internal/processing/account/account_test.go +++ b/internal/processing/account/account_test.go @@ -81,7 +81,7 @@ func (suite *AccountStandardTestSuite) SetupSuite() { func (suite *AccountStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go index bf1ea2f44..bd320571f 100644 --- a/internal/processing/account/delete.go +++ b/internal/processing/account/delete.go @@ -27,6 +27,7 @@ import ( "github.com/google/uuid" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -39,38 +40,45 @@ const deleteSelectLimit = 50 // Delete deletes an account, and all of that account's statuses, media, follows, notifications, etc etc etc. // The origin passed here should be either the ID of the account doing the delete (can be itself), or the ID of a domain block. -func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origin string) gtserror.WithCode { +func (p *Processor) Delete( + ctx context.Context, + account *gtsmodel.Account, + origin string, +) gtserror.WithCode { l := log.WithContext(ctx).WithFields(kv.Fields{ {"username", account.Username}, {"domain", account.Domain}, }...) l.Trace("beginning account delete process") + // Delete statuses *before* follows to ensure correct addressing + // of any outgoing fedi messages generated by deleting statuses. + if err := p.deleteAccountStatuses(ctx, account); err != nil { + l.Errorf("continuing after error during account delete: %v", err) + } + if err := p.deleteAccountFollows(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } if err := p.deleteAccountBlocks(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) - } - - if err := p.deleteAccountStatuses(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } if err := p.deleteAccountNotifications(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } if err := p.deleteAccountPeripheral(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } if account.IsLocal() { - // we tokens, applications and clients for account as one of the last - // stages during deletion, as other database models rely on these. + // We delete tokens, applications and clients for + // account as one of the last stages during deletion, + // as other database models rely on these. if err := p.deleteUserAndTokensForAccount(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } } @@ -83,7 +91,7 @@ func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origi return gtserror.NewErrorInternalError(err) } - l.Info("account deleted") + l.Info("account delete process complete") return nil } @@ -189,7 +197,7 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. // To avoid checking if account is local over + over // inside the subsequent loops, just generate static // side effects function once now. - unfollowSideEffects = p.unfollowSideEffectsFunc(account) + unfollowSideEffects = p.unfollowSideEffectsFunc(account.IsLocal()) ) // Delete follows originating from this account. @@ -240,31 +248,56 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. } } - // Process accreted messages asynchronously. - p.state.Workers.EnqueueClientAPI(ctx, msgs...) + // Process accreted messages in serial. + for _, msg := range msgs { + if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { + log.Errorf( + ctx, + "error processing %s of %s during Delete of account %s: %v", + msg.APActivityType, msg.APObjectType, account.ID, err, + ) + } + } return nil } -func (p *Processor) unfollowSideEffectsFunc(deletedAccount *gtsmodel.Account) func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { - if !deletedAccount.IsLocal() { +func (p *Processor) unfollowSideEffectsFunc(local bool) func( + ctx context.Context, + account *gtsmodel.Account, + follow *gtsmodel.Follow, +) *messages.FromClientAPI { + if !local { // Don't try to process side effects // for accounts that aren't local. - return func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { - return nil // noop + return func( + _ context.Context, + _ *gtsmodel.Account, + _ *gtsmodel.Follow, + ) *messages.FromClientAPI { + // noop + return nil } } - return func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { + return func( + ctx context.Context, + account *gtsmodel.Account, + follow *gtsmodel.Follow, + ) *messages.FromClientAPI { if follow.TargetAccount == nil { // TargetAccount seems to have gone; // race condition? db corruption? - log.WithContext(ctx).WithField("follow", follow).Warn("follow had no TargetAccount, likely race condition") + log. + WithContext(ctx). + WithField("follow", follow). + Warn("follow had no TargetAccount, likely race condition") return nil } if follow.TargetAccount.IsLocal() { - // No side effects for local unfollows. + // No side effects + // for local unfollows. return nil } @@ -288,8 +321,11 @@ func (p *Processor) deleteAccountBlocks(ctx context.Context, account *gtsmodel.A // deleteAccountStatuses iterates through all statuses owned by // the given account, passing each discovered status (and boosts -// thereof) to the processor workers for further async processing. -func (p *Processor) deleteAccountStatuses(ctx context.Context, account *gtsmodel.Account) error { +// thereof) to the processor workers for further processing. +func (p *Processor) deleteAccountStatuses( + ctx context.Context, + account *gtsmodel.Account, +) error { // We'll select statuses 50 at a time so we don't wreck the db, // and pass them through to the client api worker to handle. // @@ -331,42 +367,43 @@ statusLoop: maxID = statuses[len(statuses)-1].ID for _, status := range statuses { - status.Account = account // ensure account is set - - // Pass the status delete through the client api worker for processing. - msgs = append(msgs, messages.FromClientAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityDelete, - GTSModel: status, - OriginAccount: account, - TargetAccount: account, - }) + // Ensure account is set. + status.Account = account // Look for any boosts of this status in DB. - boosts, err := p.state.DB.GetStatusBoosts(ctx, status.ID) + // + // We put these in the msgs slice first so + // that they're handled first, before the + // parent status that's being boosted. + // + // Use a barebones context and just select the + // origin account separately. The rest will be + // populated later anyway, and we don't want to + // stop now because we couldn't get something. + boosts, err := p.state.DB.GetStatusBoosts( + gtscontext.SetBarebones(ctx), + status.ID, + ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return gtserror.Newf("error fetching status reblogs for %s: %w", status.ID, err) + return gtserror.Newf("error fetching status boosts for %s: %w", status.ID, err) } + // Prepare to Undo each boost. for _, boost := range boosts { - if boost.Account == nil { - // Fetch the relevant account for this status boost. - boostAcc, err := p.state.DB.GetAccountByID(ctx, boost.AccountID) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - // We don't have an account for this boost - // for some reason, so just skip processing. - log.WithContext(ctx).WithField("boost", boost).Warnf("no account found with id %s for boost %s", boost.AccountID, boost.ID) - continue - } - return gtserror.Newf("error fetching boosted status account for %s: %w", boost.AccountID, err) - } + boost.Account, err = p.state.DB.GetAccountByID( + gtscontext.SetBarebones(ctx), + boost.AccountID, + ) - // Set account model - boost.Account = boostAcc + if err != nil { + log.Warnf( + ctx, + "db error getting owner %s of status boost %s: %v", + boost.AccountID, boost.ID, err, + ) + continue } - // Pass the boost delete through the client api worker for processing. msgs = append(msgs, messages.FromClientAPI{ APObjectType: ap.ActivityAnnounce, APActivityType: ap.ActivityUndo, @@ -375,11 +412,28 @@ statusLoop: TargetAccount: account, }) } + + // Now prepare to Delete status. + msgs = append(msgs, messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityDelete, + GTSModel: status, + OriginAccount: account, + TargetAccount: account, + }) } } - // Batch process all accreted messages. - p.state.Workers.EnqueueClientAPI(ctx, msgs...) + // Process accreted messages in serial. + for _, msg := range msgs { + if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { + log.Errorf( + ctx, + "error processing %s of %s during Delete of account %s: %v", + msg.APActivityType, msg.APObjectType, account.ID, err, + ) + } + } return nil } diff --git a/internal/processing/admin/admin_test.go b/internal/processing/admin/admin_test.go index 614735ee1..367924664 100644 --- a/internal/processing/admin/admin_test.go +++ b/internal/processing/admin/admin_test.go @@ -80,7 +80,7 @@ func (suite *AdminStandardTestSuite) SetupSuite() { func (suite *AdminStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.InitTestConfig() testrig.InitTestLog() @@ -115,7 +115,7 @@ func (suite *AdminStandardTestSuite) SetupTest() { suite.emailSender, ) - suite.state.Workers.ProcessFromClientAPI = suite.processor.Workers().ProcessFromClientAPI + testrig.StartWorkers(&suite.state, suite.processor.Workers()) suite.adminProcessor = suite.processor.Admin() testrig.StandardDBSetup(suite.db, nil) diff --git a/internal/processing/polls/poll_test.go b/internal/processing/polls/poll_test.go index 15a1938a8..59e9aeb60 100644 --- a/internal/processing/polls/poll_test.go +++ b/internal/processing/polls/poll_test.go @@ -50,7 +50,7 @@ func (suite *PollTestSuite) SetupTest() { testrig.InitTestConfig() testrig.InitTestLog() suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.NewTestDB(&suite.state) converter := typeutils.NewConverter(&suite.state) controller := testrig.NewTestTransportController(&suite.state, nil) diff --git a/internal/processing/processor_test.go b/internal/processing/processor_test.go index 2e0baae96..148ec42ed 100644 --- a/internal/processing/processor_test.go +++ b/internal/processing/processor_test.go @@ -95,7 +95,7 @@ func (suite *ProcessingStandardTestSuite) SetupSuite() { func (suite *ProcessingStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/processing/status/status_test.go b/internal/processing/status/status_test.go index dd9ad00f8..2a47a205e 100644 --- a/internal/processing/status/status_test.go +++ b/internal/processing/status/status_test.go @@ -74,7 +74,7 @@ func (suite *StatusStandardTestSuite) SetupSuite() { func (suite *StatusStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index c97e9eeb8..cbdd10f9a 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -97,7 +97,6 @@ func (suite *WorkersTestSuite) SetupSuite() { func (suite *WorkersTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() @@ -126,6 +125,8 @@ func (suite *WorkersTestSuite) SetupTest() { suite.emailSender = testrig.NewEmailSender("../../../web/template/", nil) suite.processor = processing.NewProcessor(cleaner.New(&suite.state), suite.typeconverter, suite.federator, suite.oauthServer, suite.mediaManager, &suite.state, suite.emailSender) + testrig.StartWorkers(&suite.state, suite.processor.Workers()) + suite.state.Workers.EnqueueClientAPI = suite.processor.Workers().EnqueueClientAPI suite.state.Workers.EnqueueFediAPI = suite.processor.Workers().EnqueueFediAPI diff --git a/internal/timeline/timeline_test.go b/internal/timeline/timeline_test.go index f678a6292..d142ed1ff 100644 --- a/internal/timeline/timeline_test.go +++ b/internal/timeline/timeline_test.go @@ -48,7 +48,7 @@ func (suite *TimelineStandardTestSuite) SetupTest() { suite.state = new(state.State) suite.state.Caches.Init() - testrig.StartWorkers(suite.state) + testrig.StartNoopWorkers(suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c58c2a055..b68f9695a 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -67,7 +67,7 @@ func (suite *TransportTestSuite) SetupSuite() { func (suite *TransportTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/typeutils/converter_test.go b/internal/typeutils/converter_test.go index 74b213431..122b98d4c 100644 --- a/internal/typeutils/converter_test.go +++ b/internal/typeutils/converter_test.go @@ -515,7 +515,6 @@ func (suite *TypeUtilsTestSuite) TearDownTest() { // GetProcessor is a utility function that instantiates a processor. // Useful when a test in the test suite needs to change some state. func (suite *TypeUtilsTestSuite) GetProcessor() *processing.Processor { - testrig.StartWorkers(&suite.state) testrig.StartTimelines( &suite.state, visibility.NewFilter(&suite.state), @@ -527,5 +526,9 @@ func (suite *TypeUtilsTestSuite) GetProcessor() *processing.Processor { mediaManager := testrig.NewTestMediaManager(&suite.state) federator := testrig.NewTestFederator(&suite.state, transportController, mediaManager) emailSender := testrig.NewEmailSender("../../web/template/", nil) - return testrig.NewTestProcessor(&suite.state, federator, emailSender, mediaManager) + + processor := testrig.NewTestProcessor(&suite.state, federator, emailSender, mediaManager) + testrig.StartWorkers(&suite.state, processor.Workers()) + + return processor } diff --git a/testrig/mediahandler.go b/testrig/mediahandler.go index d35dca97e..430e87eb0 100644 --- a/testrig/mediahandler.go +++ b/testrig/mediahandler.go @@ -24,6 +24,6 @@ import ( // NewTestMediaManager returns a media handler with the default test config, and the given db and storage. func NewTestMediaManager(state *state.State) *media.Manager { - StartWorkers(state) // ensure started + StartNoopWorkers(state) // ensure started return media.NewManager(state) } diff --git a/testrig/processor.go b/testrig/processor.go index 137934c5e..e8a871422 100644 --- a/testrig/processor.go +++ b/testrig/processor.go @@ -27,10 +27,14 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/typeutils" ) -// NewTestProcessor returns a Processor suitable for testing purposes +// NewTestProcessor returns a Processor suitable for testing purposes. +// The passed in state will have its worker functions set appropriately, +// but the state will not be initialized. func NewTestProcessor(state *state.State, federator *federation.Federator, emailSender email.Sender, mediaManager *media.Manager) *processing.Processor { p := processing.NewProcessor(cleaner.New(state), typeutils.NewConverter(state), federator, NewTestOauthServer(state.DB), mediaManager, state, emailSender) state.Workers.EnqueueClientAPI = p.Workers().EnqueueClientAPI state.Workers.EnqueueFediAPI = p.Workers().EnqueueFediAPI + state.Workers.ProcessFromClientAPI = p.Workers().ProcessFromClientAPI + state.Workers.ProcessFromFediAPI = p.Workers().ProcessFromFediAPI return p } diff --git a/testrig/util.go b/testrig/util.go index c2360dc9e..9e5987065 100644 --- a/testrig/util.go +++ b/testrig/util.go @@ -29,13 +29,16 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/messages" tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" + wprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/workers" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" ) -func StartWorkers(state *state.State) { +// Starts workers on the provided state using noop processing functions. +// Useful when you *don't* want to trigger side effects in a test. +func StartNoopWorkers(state *state.State) { state.Workers.EnqueueClientAPI = func(context.Context, ...messages.FromClientAPI) {} state.Workers.EnqueueFediAPI = func(context.Context, ...messages.FromFediAPI) {} state.Workers.ProcessFromClientAPI = func(context.Context, messages.FromClientAPI) error { return nil } @@ -47,6 +50,20 @@ func StartWorkers(state *state.State) { _ = state.Workers.Media.Start(1, 10) } +// Starts workers on the provided state using processing functions from the given +// workers processor. Useful when you *do* want to trigger side effects in a test. +func StartWorkers(state *state.State, wProcessor *wprocessor.Processor) { + state.Workers.EnqueueClientAPI = wProcessor.EnqueueClientAPI + state.Workers.EnqueueFediAPI = wProcessor.EnqueueFediAPI + state.Workers.ProcessFromClientAPI = wProcessor.ProcessFromClientAPI + state.Workers.ProcessFromFediAPI = wProcessor.ProcessFromFediAPI + + _ = state.Workers.Scheduler.Start() + _ = state.Workers.ClientAPI.Start(1, 10) + _ = state.Workers.Federator.Start(1, 10) + _ = state.Workers.Media.Start(1, 10) +} + func StopWorkers(state *state.State) { _ = state.Workers.Scheduler.Stop() _ = state.Workers.ClientAPI.Stop()