[bugfix] concurrent map writes in dereferencer media processing maps (#2964)

* removes the avatar / header deref maps as we now have per-uri status / account locks, adds retries on data-races, adds separate emoji map mutex

* work with a copy of account / status for each retry loop

* revert to old data race behaviour, it gets too complicated otherwise

---------

Co-authored-by: tobi <tobi.smethurst@protonmail.com>
This commit is contained in:
kim 2024-06-06 08:50:14 +00:00 committed by GitHub
parent 9caf29bed2
commit 3b7faac604
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 78 additions and 113 deletions

View file

@ -496,7 +496,7 @@ func (d *Dereferencer) enrichAccount(
account.Username, account.Domain, err, account.Username, account.Domain, err,
) )
case err == nil && account.Domain != accDomain: case account.Domain != accDomain:
// After webfinger, we now have correct account domain from which we can do a final DB check. // After webfinger, we now have correct account domain from which we can do a final DB check.
alreadyAcc, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain) alreadyAcc, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain)
if err != nil && !errors.Is(err, db.ErrNoEntries) { if err != nil && !errors.Is(err, db.ErrNoEntries) {
@ -518,7 +518,7 @@ func (d *Dereferencer) enrichAccount(
// or the stub account we were passed. // or the stub account we were passed.
fallthrough fallthrough
case err == nil: default:
// Update account with latest info. // Update account with latest info.
account.URI = accURI.String() account.URI = accURI.String()
account.Domain = accDomain account.Domain = accDomain
@ -531,19 +531,14 @@ func (d *Dereferencer) enrichAccount(
// must parse from account. // must parse from account.
uri, err = url.Parse(account.URI) uri, err = url.Parse(account.URI)
if err != nil { if err != nil {
return nil, nil, gtserror.Newf( err := gtserror.Newf("invalid uri %q: %w", account.URI, err)
"invalid uri %q: %w", return nil, nil, gtserror.SetUnretrievable(err)
account.URI, gtserror.SetUnretrievable(err),
)
} }
// Check URI scheme ahead of time for more useful errs. // Check URI scheme ahead of time for more useful errs.
if uri.Scheme != "http" && uri.Scheme != "https" { if uri.Scheme != "http" && uri.Scheme != "https" {
err := errors.New("account URI scheme must be http or https") err := gtserror.Newf("invalid uri %q: scheme must be http(s)", account.URI)
return nil, nil, gtserror.Newf( return nil, nil, gtserror.SetUnretrievable(err)
"invalid uri %q: %w",
account.URI, gtserror.SetUnretrievable(err),
)
} }
} }
@ -634,7 +629,7 @@ func (d *Dereferencer) enrichAccount(
if err != nil { if err != nil {
// ASRepresentationToAccount will set Malformed on the // ASRepresentationToAccount will set Malformed on the
// returned error, so we don't need to do it here. // returned error, so we don't need to do it here.
err = gtserror.Newf("error converting accountable to gts model for account %s: %w", uri, err) err = gtserror.Newf("error converting %s to gts model: %w", uri, err)
return nil, nil, err return nil, nil, err
} }
@ -798,39 +793,16 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran
return gtserror.Newf("error parsing url %s: %w", latestAcc.AvatarRemoteURL, err) return gtserror.Newf("error parsing url %s: %w", latestAcc.AvatarRemoteURL, err)
} }
// Acquire lock for derefs map. // Set the media data function to dereference avatar from URI.
unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) data := func(ctx context.Context) (io.ReadCloser, int64, error) {
unlock = util.DoOnce(unlock) return tsport.DereferenceMedia(ctx, avatarURI)
defer unlock()
// Look for an existing dereference in progress.
processing, ok := d.derefAvatars[latestAcc.AvatarRemoteURL]
if !ok {
// Set the media data function to dereference avatar from URI.
data := func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, avatarURI)
}
// Create new media processing request from the media manager instance.
processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{
Avatar: func() *bool { v := true; return &v }(),
RemoteURL: &latestAcc.AvatarRemoteURL,
})
// Store media in map to mark as processing.
d.derefAvatars[latestAcc.AvatarRemoteURL] = processing
defer func() {
// On exit safely remove media from map.
unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL)
delete(d.derefAvatars, latestAcc.AvatarRemoteURL)
unlock()
}()
} }
// Unlock map. // Create new media processing request from the media manager instance.
unlock() processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{
Avatar: func() *bool { v := true; return &v }(),
RemoteURL: &latestAcc.AvatarRemoteURL,
})
// Start media attachment loading (blocking call). // Start media attachment loading (blocking call).
if _, err := processing.LoadAttachment(ctx); err != nil { if _, err := processing.LoadAttachment(ctx); err != nil {
@ -884,39 +856,16 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran
return gtserror.Newf("error parsing url %s: %w", latestAcc.HeaderRemoteURL, err) return gtserror.Newf("error parsing url %s: %w", latestAcc.HeaderRemoteURL, err)
} }
// Acquire lock for derefs map. // Set the media data function to dereference avatar from URI.
unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) data := func(ctx context.Context) (io.ReadCloser, int64, error) {
unlock = util.DoOnce(unlock) return tsport.DereferenceMedia(ctx, headerURI)
defer unlock()
// Look for an existing dereference in progress.
processing, ok := d.derefHeaders[latestAcc.HeaderRemoteURL]
if !ok {
// Set the media data function to dereference avatar from URI.
data := func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, headerURI)
}
// Create new media processing request from the media manager instance.
processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{
Header: func() *bool { v := true; return &v }(),
RemoteURL: &latestAcc.HeaderRemoteURL,
})
// Store media in map to mark as processing.
d.derefHeaders[latestAcc.HeaderRemoteURL] = processing
defer func() {
// On exit safely remove media from map.
unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL)
delete(d.derefHeaders, latestAcc.HeaderRemoteURL)
unlock()
}()
} }
// Unlock map. // Create new media processing request from the media manager instance.
unlock() processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{
Header: func() *bool { v := true; return &v }(),
RemoteURL: &latestAcc.HeaderRemoteURL,
})
// Start media attachment loading (blocking call). // Start media attachment loading (blocking call).
if _, err := processing.LoadAttachment(ctx); err != nil { if _, err := processing.LoadAttachment(ctx); err != nil {

View file

@ -85,11 +85,22 @@ type Dereferencer struct {
mediaManager *media.Manager mediaManager *media.Manager
visibility *visibility.Filter visibility *visibility.Filter
// all protected by State{}.FedLocks. // in-progress dereferencing emoji. we already perform
derefAvatars map[string]*media.ProcessingMedia // locks per-status and per-account so we don't need
derefHeaders map[string]*media.ProcessingMedia // processing maps for other media which won't often
derefEmojis map[string]*media.ProcessingEmoji // end up being repeated. worst case we run into an
// db.ErrAlreadyExists error which then gets handled
// appropriately by enrich{Account,Status}Safely().
derefEmojis map[string]*media.ProcessingEmoji
derefEmojisMu sync.Mutex
// handshakes marks current in-progress handshakes
// occurring, useful to prevent a deadlock between
// gotosocial instances attempting to dereference
// accounts for the first time. when a handshake is
// currently ongoing we know not to block waiting
// on certain data and instead return an in-progress
// form of the data as we currently see it.
handshakes map[string][]*url.URL handshakes map[string][]*url.URL
handshakesMu sync.Mutex handshakesMu sync.Mutex
} }
@ -108,8 +119,6 @@ func NewDereferencer(
transportController: transportController, transportController: transportController,
mediaManager: mediaManager, mediaManager: mediaManager,
visibility: visFilter, visibility: visFilter,
derefAvatars: make(map[string]*media.ProcessingMedia),
derefHeaders: make(map[string]*media.ProcessingMedia),
derefEmojis: make(map[string]*media.ProcessingEmoji), derefEmojis: make(map[string]*media.ProcessingEmoji),
handshakes: make(map[string][]*url.URL), handshakes: make(map[string][]*url.URL),
} }

View file

@ -24,6 +24,7 @@ import (
"net/url" "net/url"
"github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/log"
@ -31,11 +32,8 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/util" "github.com/superseriousbusiness/gotosocial/internal/util"
) )
func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestUser string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) {
var ( var shortcodeDomain = shortcode + "@" + domain
shortcodeDomain = shortcode + "@" + domain
processingEmoji *media.ProcessingEmoji
)
// Ensure we have been passed a valid URL. // Ensure we have been passed a valid URL.
derefURI, err := url.Parse(remoteURL) derefURI, err := url.Parse(remoteURL)
@ -43,52 +41,61 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st
return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err)
} }
// Acquire lock for derefs map. // Acquire derefs lock.
unlock := d.state.FedLocks.Lock(remoteURL) d.derefEmojisMu.Lock()
// Ensure unlock only done once.
unlock := d.derefEmojisMu.Unlock
unlock = util.DoOnce(unlock) unlock = util.DoOnce(unlock)
defer unlock() defer unlock()
// first check if we're already processing this emoji // Look for an existing dereference in progress.
if alreadyProcessing, ok := d.derefEmojis[shortcodeDomain]; ok { processing, ok := d.derefEmojis[shortcodeDomain]
// we're already on it, no worries
processingEmoji = alreadyProcessing if !ok {
} else { // Fetch a transport for current request user in order to perform request.
// not processing it yet, let's start tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername)
if err != nil { if err != nil {
return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) return nil, gtserror.Newf("couldn't create transport: %w", err)
} }
dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) { // Set the media data function to dereference emoji from URI.
return t.DereferenceMedia(innerCtx, derefURI) data := func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, derefURI)
} }
newProcessing, err := d.mediaManager.PreProcessEmoji(ctx, dataFunc, shortcode, id, emojiURI, ai, refresh) // Create new emoji processing request from the media manager.
processing, err = d.mediaManager.PreProcessEmoji(ctx, data,
shortcode,
id,
emojiURI,
ai,
refresh,
)
if err != nil { if err != nil {
return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err) return nil, gtserror.Newf("error preprocessing emoji %s: %s", shortcodeDomain, err)
} }
// store it in our map to indicate it's in process // Store media in map to mark as processing.
d.derefEmojis[shortcodeDomain] = newProcessing d.derefEmojis[shortcodeDomain] = processing
processingEmoji = newProcessing
defer func() {
// On exit safely remove emoji from map.
d.derefEmojisMu.Lock()
delete(d.derefEmojis, shortcodeDomain)
d.derefEmojisMu.Unlock()
}()
} }
// Unlock map. // Unlock map.
unlock() unlock()
defer func() {
// On exit safely remove emoji from map.
unlock := d.state.FedLocks.Lock(remoteURL)
delete(d.derefEmojis, shortcodeDomain)
unlock()
}()
// Start emoji attachment loading (blocking call). // Start emoji attachment loading (blocking call).
if _, err := processingEmoji.LoadEmoji(ctx); err != nil { if _, err := processing.LoadEmoji(ctx); err != nil {
return nil, err return nil, err
} }
return processingEmoji, nil return processing, nil
} }
func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, requestingUsername string) ([]*gtsmodel.Emoji, error) { func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, requestingUsername string) ([]*gtsmodel.Emoji, error) {

View file

@ -285,7 +285,7 @@ func (d *Dereferencer) enrichStatusSafely(
requestUser string, requestUser string,
uri *url.URL, uri *url.URL,
status *gtsmodel.Status, status *gtsmodel.Status,
apubStatus ap.Statusable, statusable ap.Statusable,
) (*gtsmodel.Status, ap.Statusable, bool, error) { ) (*gtsmodel.Status, ap.Statusable, bool, error) {
uriStr := status.URI uriStr := status.URI
@ -313,7 +313,7 @@ func (d *Dereferencer) enrichStatusSafely(
requestUser, requestUser,
uri, uri,
status, status,
apubStatus, statusable,
) )
if gtserror.StatusCode(err) >= 400 { if gtserror.StatusCode(err) >= 400 {