From 8c0141d103cb70fdbe74f1d5a936860707da973f Mon Sep 17 00:00:00 2001 From: tsmethurst Date: Tue, 8 Feb 2022 13:38:44 +0100 Subject: [PATCH] store and retrieve processState atomically --- internal/media/processingemoji.go | 23 +++++++++---------- internal/media/processingmedia.go | 37 +++++++++++++++++-------------- internal/media/types.go | 2 +- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index 292712427..741854b9b 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -25,6 +25,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" "codeberg.org/gruf/go-store/kv" @@ -53,9 +54,7 @@ type ProcessingEmoji struct { /* below fields represent the processing state of the static of the emoji */ - - staticState processState - fullSizeState processState + staticState int32 /* below pointers to database and storage are maintained so that @@ -104,17 +103,18 @@ func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error // Finished returns true if processing has finished for both the thumbnail // and full fized version of this piece of media. func (p *ProcessingEmoji) Finished() bool { - return p.staticState == complete && p.fullSizeState == complete + return atomic.LoadInt32(&p.staticState) == int32(complete) } func (p *ProcessingEmoji) loadStatic(ctx context.Context) error { - switch p.staticState { + staticState := atomic.LoadInt32(&p.staticState) + switch processState(staticState) { case received: // stream the original file out of storage... stored, err := p.storage.GetStream(p.emoji.ImagePath) if err != nil { p.err = fmt.Errorf("loadStatic: error fetching file from storage: %s", err) - p.staticState = errored + atomic.StoreInt32(&p.staticState, int32(errored)) return p.err } @@ -122,27 +122,27 @@ func (p *ProcessingEmoji) loadStatic(ctx context.Context) error { static, err := deriveStaticEmoji(stored, p.emoji.ImageContentType) if err != nil { p.err = fmt.Errorf("loadStatic: error deriving static: %s", err) - p.staticState = errored + atomic.StoreInt32(&p.staticState, int32(errored)) return p.err } if err := stored.Close(); err != nil { p.err = fmt.Errorf("loadStatic: error closing stored full size: %s", err) - p.staticState = errored + atomic.StoreInt32(&p.staticState, int32(errored)) return p.err } // put the static in storage if err := p.storage.Put(p.emoji.ImageStaticPath, static.small); err != nil { p.err = fmt.Errorf("loadStatic: error storing static: %s", err) - p.staticState = errored + atomic.StoreInt32(&p.staticState, int32(errored)) return p.err } p.emoji.ImageStaticFileSize = len(static.small) // we're done processing the static version of the emoji! - p.staticState = complete + atomic.StoreInt32(&p.staticState, int32(complete)) fallthrough case complete: return nil @@ -281,8 +281,7 @@ func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode instanceAccountID: instanceAccount.ID, emoji: emoji, data: data, - staticState: received, - fullSizeState: received, + staticState: int32(received), database: m.db, storage: m.storage, } diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 0bbe35aee..0f47ee4e6 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -25,6 +25,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" "codeberg.org/gruf/go-store/kv" @@ -49,8 +50,8 @@ type ProcessingMedia struct { data DataFunc read bool // bool indicating that data function has been triggered already - thumbstate processState // the processing state of the media thumbnail - fullSizeState processState // the processing state of the full-sized media + thumbState int32 // the processing state of the media thumbnail + fullSizeState int32 // the processing state of the full-sized media /* below pointers to database and storage are maintained so that @@ -103,11 +104,12 @@ func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAt // Finished returns true if processing has finished for both the thumbnail // and full fized version of this piece of media. func (p *ProcessingMedia) Finished() bool { - return p.thumbstate == complete && p.fullSizeState == complete + return atomic.LoadInt32(&p.thumbState) == int32(complete) && atomic.LoadInt32(&p.fullSizeState) == int32(complete) } func (p *ProcessingMedia) loadThumb(ctx context.Context) error { - switch p.thumbstate { + thumbState := atomic.LoadInt32(&p.thumbState) + switch processState(thumbState) { case received: // we haven't processed a thumbnail for this media yet so do it now @@ -122,7 +124,7 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error { stored, err := p.storage.GetStream(p.attachment.File.Path) if err != nil { p.err = fmt.Errorf("loadThumb: error fetching file from storage: %s", err) - p.thumbstate = errored + atomic.StoreInt32(&p.thumbState, int32(errored)) return p.err } @@ -130,20 +132,20 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error { thumb, err := deriveThumbnail(stored, p.attachment.File.ContentType, createBlurhash) if err != nil { p.err = fmt.Errorf("loadThumb: error deriving thumbnail: %s", err) - p.thumbstate = errored + atomic.StoreInt32(&p.thumbState, int32(errored)) return p.err } if err := stored.Close(); err != nil { p.err = fmt.Errorf("loadThumb: error closing stored full size: %s", err) - p.thumbstate = errored + atomic.StoreInt32(&p.thumbState, int32(errored)) return p.err } // put the thumbnail in storage if err := p.storage.Put(p.attachment.Thumbnail.Path, thumb.small); err != nil { p.err = fmt.Errorf("loadThumb: error storing thumbnail: %s", err) - p.thumbstate = errored + atomic.StoreInt32(&p.thumbState, int32(errored)) return p.err } @@ -160,7 +162,7 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error { p.attachment.Thumbnail.FileSize = len(thumb.small) // we're done processing the thumbnail! - p.thumbstate = complete + atomic.StoreInt32(&p.thumbState, int32(complete)) fallthrough case complete: return nil @@ -168,11 +170,12 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error { return p.err } - return fmt.Errorf("loadThumb: thumbnail processing status %d unknown", p.thumbstate) + return fmt.Errorf("loadThumb: thumbnail processing status %d unknown", p.thumbState) } func (p *ProcessingMedia) loadFullSize(ctx context.Context) error { - switch p.fullSizeState { + fullSizeState := atomic.LoadInt32(&p.fullSizeState) + switch processState(fullSizeState) { case received: var err error var decoded *imageMeta @@ -181,7 +184,7 @@ func (p *ProcessingMedia) loadFullSize(ctx context.Context) error { stored, err := p.storage.GetStream(p.attachment.File.Path) if err != nil { p.err = fmt.Errorf("loadFullSize: error fetching file from storage: %s", err) - p.fullSizeState = errored + atomic.StoreInt32(&p.fullSizeState, int32(errored)) return p.err } @@ -198,13 +201,13 @@ func (p *ProcessingMedia) loadFullSize(ctx context.Context) error { if err != nil { p.err = err - p.fullSizeState = errored + atomic.StoreInt32(&p.fullSizeState, int32(errored)) return p.err } if err := stored.Close(); err != nil { p.err = fmt.Errorf("loadFullSize: error closing stored full size: %s", err) - p.thumbstate = errored + atomic.StoreInt32(&p.fullSizeState, int32(errored)) return p.err } @@ -219,7 +222,7 @@ func (p *ProcessingMedia) loadFullSize(ctx context.Context) error { p.attachment.Processing = gtsmodel.ProcessingStatusProcessed // we're done processing the full-size image - p.fullSizeState = complete + atomic.StoreInt32(&p.fullSizeState, int32(complete)) fallthrough case complete: return nil @@ -400,8 +403,8 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID processingMedia := &ProcessingMedia{ attachment: attachment, data: data, - thumbstate: received, - fullSizeState: received, + thumbState: int32(received), + fullSizeState: int32(received), database: m.db, storage: m.storage, } diff --git a/internal/media/types.go b/internal/media/types.go index b9c79d464..a6b38b467 100644 --- a/internal/media/types.go +++ b/internal/media/types.go @@ -45,7 +45,7 @@ const ( mimeImagePng = mimeImage + "/" + mimePng ) -type processState int +type processState int32 const ( received processState = iota // processing order has been received but not done yet