From 3b7faac604000297b74baf8f922c79c6b387217d Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Thu, 6 Jun 2024 08:50:14 +0000 Subject: [PATCH] [bugfix] concurrent map writes in dereferencer media processing maps (#2964) * removes the avatar / header deref maps as we now have per-uri status / account locks, adds retries on data-races, adds separate emoji map mutex * work with a copy of account / status for each retry loop * revert to old data race behaviour, it gets too complicated otherwise --------- Co-authored-by: tobi --- internal/federation/dereferencing/account.go | 97 +++++-------------- .../federation/dereferencing/dereferencer.go | 21 ++-- internal/federation/dereferencing/emoji.go | 69 +++++++------ internal/federation/dereferencing/status.go | 4 +- 4 files changed, 78 insertions(+), 113 deletions(-) diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 94df9538a..bd97b91ed 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -496,7 +496,7 @@ func (d *Dereferencer) enrichAccount( account.Username, account.Domain, err, ) - case err == nil && account.Domain != accDomain: + case account.Domain != accDomain: // After webfinger, we now have correct account domain from which we can do a final DB check. alreadyAcc, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain) if err != nil && !errors.Is(err, db.ErrNoEntries) { @@ -518,7 +518,7 @@ func (d *Dereferencer) enrichAccount( // or the stub account we were passed. fallthrough - case err == nil: + default: // Update account with latest info. account.URI = accURI.String() account.Domain = accDomain @@ -531,19 +531,14 @@ func (d *Dereferencer) enrichAccount( // must parse from account. uri, err = url.Parse(account.URI) if err != nil { - return nil, nil, gtserror.Newf( - "invalid uri %q: %w", - account.URI, gtserror.SetUnretrievable(err), - ) + err := gtserror.Newf("invalid uri %q: %w", account.URI, err) + return nil, nil, gtserror.SetUnretrievable(err) } // Check URI scheme ahead of time for more useful errs. if uri.Scheme != "http" && uri.Scheme != "https" { - err := errors.New("account URI scheme must be http or https") - return nil, nil, gtserror.Newf( - "invalid uri %q: %w", - account.URI, gtserror.SetUnretrievable(err), - ) + err := gtserror.Newf("invalid uri %q: scheme must be http(s)", account.URI) + return nil, nil, gtserror.SetUnretrievable(err) } } @@ -634,7 +629,7 @@ func (d *Dereferencer) enrichAccount( if err != nil { // ASRepresentationToAccount will set Malformed on the // returned error, so we don't need to do it here. - err = gtserror.Newf("error converting accountable to gts model for account %s: %w", uri, err) + err = gtserror.Newf("error converting %s to gts model: %w", uri, err) return nil, nil, err } @@ -798,39 +793,16 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran return gtserror.Newf("error parsing url %s: %w", latestAcc.AvatarRemoteURL, err) } - // Acquire lock for derefs map. - unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) - unlock = util.DoOnce(unlock) - defer unlock() - - // Look for an existing dereference in progress. - processing, ok := d.derefAvatars[latestAcc.AvatarRemoteURL] - - if !ok { - // Set the media data function to dereference avatar from URI. - data := func(ctx context.Context) (io.ReadCloser, int64, error) { - return tsport.DereferenceMedia(ctx, avatarURI) - } - - // Create new media processing request from the media manager instance. - processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ - Avatar: func() *bool { v := true; return &v }(), - RemoteURL: &latestAcc.AvatarRemoteURL, - }) - - // Store media in map to mark as processing. - d.derefAvatars[latestAcc.AvatarRemoteURL] = processing - - defer func() { - // On exit safely remove media from map. - unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) - delete(d.derefAvatars, latestAcc.AvatarRemoteURL) - unlock() - }() + // Set the media data function to dereference avatar from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, avatarURI) } - // Unlock map. - unlock() + // Create new media processing request from the media manager instance. + processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ + Avatar: func() *bool { v := true; return &v }(), + RemoteURL: &latestAcc.AvatarRemoteURL, + }) // Start media attachment loading (blocking call). if _, err := processing.LoadAttachment(ctx); err != nil { @@ -884,39 +856,16 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran return gtserror.Newf("error parsing url %s: %w", latestAcc.HeaderRemoteURL, err) } - // Acquire lock for derefs map. - unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) - unlock = util.DoOnce(unlock) - defer unlock() - - // Look for an existing dereference in progress. - processing, ok := d.derefHeaders[latestAcc.HeaderRemoteURL] - - if !ok { - // Set the media data function to dereference avatar from URI. - data := func(ctx context.Context) (io.ReadCloser, int64, error) { - return tsport.DereferenceMedia(ctx, headerURI) - } - - // Create new media processing request from the media manager instance. - processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ - Header: func() *bool { v := true; return &v }(), - RemoteURL: &latestAcc.HeaderRemoteURL, - }) - - // Store media in map to mark as processing. - d.derefHeaders[latestAcc.HeaderRemoteURL] = processing - - defer func() { - // On exit safely remove media from map. - unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) - delete(d.derefHeaders, latestAcc.HeaderRemoteURL) - unlock() - }() + // Set the media data function to dereference avatar from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, headerURI) } - // Unlock map. - unlock() + // Create new media processing request from the media manager instance. + processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ + Header: func() *bool { v := true; return &v }(), + RemoteURL: &latestAcc.HeaderRemoteURL, + }) // Start media attachment loading (blocking call). if _, err := processing.LoadAttachment(ctx); err != nil { diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index 3fa199345..f7f4d975e 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -85,11 +85,22 @@ type Dereferencer struct { mediaManager *media.Manager visibility *visibility.Filter - // all protected by State{}.FedLocks. - derefAvatars map[string]*media.ProcessingMedia - derefHeaders map[string]*media.ProcessingMedia - derefEmojis map[string]*media.ProcessingEmoji + // in-progress dereferencing emoji. we already perform + // locks per-status and per-account so we don't need + // processing maps for other media which won't often + // end up being repeated. worst case we run into an + // db.ErrAlreadyExists error which then gets handled + // appropriately by enrich{Account,Status}Safely(). + derefEmojis map[string]*media.ProcessingEmoji + derefEmojisMu sync.Mutex + // handshakes marks current in-progress handshakes + // occurring, useful to prevent a deadlock between + // gotosocial instances attempting to dereference + // accounts for the first time. when a handshake is + // currently ongoing we know not to block waiting + // on certain data and instead return an in-progress + // form of the data as we currently see it. handshakes map[string][]*url.URL handshakesMu sync.Mutex } @@ -108,8 +119,6 @@ func NewDereferencer( transportController: transportController, mediaManager: mediaManager, visibility: visFilter, - derefAvatars: make(map[string]*media.ProcessingMedia), - derefHeaders: make(map[string]*media.ProcessingMedia), derefEmojis: make(map[string]*media.ProcessingEmoji), handshakes: make(map[string][]*url.URL), } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 009191780..e81737d04 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -24,6 +24,7 @@ import ( "net/url" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -31,11 +32,8 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/util" ) -func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { - var ( - shortcodeDomain = shortcode + "@" + domain - processingEmoji *media.ProcessingEmoji - ) +func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestUser string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { + var shortcodeDomain = shortcode + "@" + domain // Ensure we have been passed a valid URL. derefURI, err := url.Parse(remoteURL) @@ -43,52 +41,61 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) } - // Acquire lock for derefs map. - unlock := d.state.FedLocks.Lock(remoteURL) + // Acquire derefs lock. + d.derefEmojisMu.Lock() + + // Ensure unlock only done once. + unlock := d.derefEmojisMu.Unlock unlock = util.DoOnce(unlock) defer unlock() - // first check if we're already processing this emoji - if alreadyProcessing, ok := d.derefEmojis[shortcodeDomain]; ok { - // we're already on it, no worries - processingEmoji = alreadyProcessing - } else { - // not processing it yet, let's start - t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) + // Look for an existing dereference in progress. + processing, ok := d.derefEmojis[shortcodeDomain] + + if !ok { + // Fetch a transport for current request user in order to perform request. + tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser) if err != nil { - return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) + return nil, gtserror.Newf("couldn't create transport: %w", err) } - dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) { - return t.DereferenceMedia(innerCtx, derefURI) + // Set the media data function to dereference emoji from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, derefURI) } - newProcessing, err := d.mediaManager.PreProcessEmoji(ctx, dataFunc, shortcode, id, emojiURI, ai, refresh) + // Create new emoji processing request from the media manager. + processing, err = d.mediaManager.PreProcessEmoji(ctx, data, + shortcode, + id, + emojiURI, + ai, + refresh, + ) if err != nil { - return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err) + return nil, gtserror.Newf("error preprocessing emoji %s: %s", shortcodeDomain, err) } - // store it in our map to indicate it's in process - d.derefEmojis[shortcodeDomain] = newProcessing - processingEmoji = newProcessing + // Store media in map to mark as processing. + d.derefEmojis[shortcodeDomain] = processing + + defer func() { + // On exit safely remove emoji from map. + d.derefEmojisMu.Lock() + delete(d.derefEmojis, shortcodeDomain) + d.derefEmojisMu.Unlock() + }() } // Unlock map. unlock() - defer func() { - // On exit safely remove emoji from map. - unlock := d.state.FedLocks.Lock(remoteURL) - delete(d.derefEmojis, shortcodeDomain) - unlock() - }() - // Start emoji attachment loading (blocking call). - if _, err := processingEmoji.LoadEmoji(ctx); err != nil { + if _, err := processing.LoadEmoji(ctx); err != nil { return nil, err } - return processingEmoji, nil + return processing, nil } func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, requestingUsername string) ([]*gtsmodel.Emoji, error) { diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index bd50a08fd..69627adc2 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -285,7 +285,7 @@ func (d *Dereferencer) enrichStatusSafely( requestUser string, uri *url.URL, status *gtsmodel.Status, - apubStatus ap.Statusable, + statusable ap.Statusable, ) (*gtsmodel.Status, ap.Statusable, bool, error) { uriStr := status.URI @@ -313,7 +313,7 @@ func (d *Dereferencer) enrichStatusSafely( requestUser, uri, status, - apubStatus, + statusable, ) if gtserror.StatusCode(err) >= 400 {