mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-11-06 00:49:35 +00:00
store and retrieve processState atomically
This commit is contained in:
parent
dba9ad4348
commit
8c0141d103
3 changed files with 32 additions and 30 deletions
|
@ -25,6 +25,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/kv"
|
"codeberg.org/gruf/go-store/kv"
|
||||||
|
@ -53,9 +54,7 @@ type ProcessingEmoji struct {
|
||||||
/*
|
/*
|
||||||
below fields represent the processing state of the static of the emoji
|
below fields represent the processing state of the static of the emoji
|
||||||
*/
|
*/
|
||||||
|
staticState int32
|
||||||
staticState processState
|
|
||||||
fullSizeState processState
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
below pointers to database and storage are maintained so that
|
below pointers to database and storage are maintained so that
|
||||||
|
@ -104,17 +103,18 @@ func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error
|
||||||
// Finished returns true if processing has finished for both the thumbnail
|
// Finished returns true if processing has finished for both the thumbnail
|
||||||
// and full fized version of this piece of media.
|
// and full fized version of this piece of media.
|
||||||
func (p *ProcessingEmoji) Finished() bool {
|
func (p *ProcessingEmoji) Finished() bool {
|
||||||
return p.staticState == complete && p.fullSizeState == complete
|
return atomic.LoadInt32(&p.staticState) == int32(complete)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProcessingEmoji) loadStatic(ctx context.Context) error {
|
func (p *ProcessingEmoji) loadStatic(ctx context.Context) error {
|
||||||
switch p.staticState {
|
staticState := atomic.LoadInt32(&p.staticState)
|
||||||
|
switch processState(staticState) {
|
||||||
case received:
|
case received:
|
||||||
// stream the original file out of storage...
|
// stream the original file out of storage...
|
||||||
stored, err := p.storage.GetStream(p.emoji.ImagePath)
|
stored, err := p.storage.GetStream(p.emoji.ImagePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = fmt.Errorf("loadStatic: error fetching file from storage: %s", err)
|
p.err = fmt.Errorf("loadStatic: error fetching file from storage: %s", err)
|
||||||
p.staticState = errored
|
atomic.StoreInt32(&p.staticState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,27 +122,27 @@ func (p *ProcessingEmoji) loadStatic(ctx context.Context) error {
|
||||||
static, err := deriveStaticEmoji(stored, p.emoji.ImageContentType)
|
static, err := deriveStaticEmoji(stored, p.emoji.ImageContentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = fmt.Errorf("loadStatic: error deriving static: %s", err)
|
p.err = fmt.Errorf("loadStatic: error deriving static: %s", err)
|
||||||
p.staticState = errored
|
atomic.StoreInt32(&p.staticState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := stored.Close(); err != nil {
|
if err := stored.Close(); err != nil {
|
||||||
p.err = fmt.Errorf("loadStatic: error closing stored full size: %s", err)
|
p.err = fmt.Errorf("loadStatic: error closing stored full size: %s", err)
|
||||||
p.staticState = errored
|
atomic.StoreInt32(&p.staticState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// put the static in storage
|
// put the static in storage
|
||||||
if err := p.storage.Put(p.emoji.ImageStaticPath, static.small); err != nil {
|
if err := p.storage.Put(p.emoji.ImageStaticPath, static.small); err != nil {
|
||||||
p.err = fmt.Errorf("loadStatic: error storing static: %s", err)
|
p.err = fmt.Errorf("loadStatic: error storing static: %s", err)
|
||||||
p.staticState = errored
|
atomic.StoreInt32(&p.staticState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
p.emoji.ImageStaticFileSize = len(static.small)
|
p.emoji.ImageStaticFileSize = len(static.small)
|
||||||
|
|
||||||
// we're done processing the static version of the emoji!
|
// we're done processing the static version of the emoji!
|
||||||
p.staticState = complete
|
atomic.StoreInt32(&p.staticState, int32(complete))
|
||||||
fallthrough
|
fallthrough
|
||||||
case complete:
|
case complete:
|
||||||
return nil
|
return nil
|
||||||
|
@ -281,8 +281,7 @@ func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode
|
||||||
instanceAccountID: instanceAccount.ID,
|
instanceAccountID: instanceAccount.ID,
|
||||||
emoji: emoji,
|
emoji: emoji,
|
||||||
data: data,
|
data: data,
|
||||||
staticState: received,
|
staticState: int32(received),
|
||||||
fullSizeState: received,
|
|
||||||
database: m.db,
|
database: m.db,
|
||||||
storage: m.storage,
|
storage: m.storage,
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/kv"
|
"codeberg.org/gruf/go-store/kv"
|
||||||
|
@ -49,8 +50,8 @@ type ProcessingMedia struct {
|
||||||
data DataFunc
|
data DataFunc
|
||||||
read bool // bool indicating that data function has been triggered already
|
read bool // bool indicating that data function has been triggered already
|
||||||
|
|
||||||
thumbstate processState // the processing state of the media thumbnail
|
thumbState int32 // the processing state of the media thumbnail
|
||||||
fullSizeState processState // the processing state of the full-sized media
|
fullSizeState int32 // the processing state of the full-sized media
|
||||||
|
|
||||||
/*
|
/*
|
||||||
below pointers to database and storage are maintained so that
|
below pointers to database and storage are maintained so that
|
||||||
|
@ -103,11 +104,12 @@ func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAt
|
||||||
// Finished returns true if processing has finished for both the thumbnail
|
// Finished returns true if processing has finished for both the thumbnail
|
||||||
// and full fized version of this piece of media.
|
// and full fized version of this piece of media.
|
||||||
func (p *ProcessingMedia) Finished() bool {
|
func (p *ProcessingMedia) Finished() bool {
|
||||||
return p.thumbstate == complete && p.fullSizeState == complete
|
return atomic.LoadInt32(&p.thumbState) == int32(complete) && atomic.LoadInt32(&p.fullSizeState) == int32(complete)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProcessingMedia) loadThumb(ctx context.Context) error {
|
func (p *ProcessingMedia) loadThumb(ctx context.Context) error {
|
||||||
switch p.thumbstate {
|
thumbState := atomic.LoadInt32(&p.thumbState)
|
||||||
|
switch processState(thumbState) {
|
||||||
case received:
|
case received:
|
||||||
// we haven't processed a thumbnail for this media yet so do it now
|
// we haven't processed a thumbnail for this media yet so do it now
|
||||||
|
|
||||||
|
@ -122,7 +124,7 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error {
|
||||||
stored, err := p.storage.GetStream(p.attachment.File.Path)
|
stored, err := p.storage.GetStream(p.attachment.File.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = fmt.Errorf("loadThumb: error fetching file from storage: %s", err)
|
p.err = fmt.Errorf("loadThumb: error fetching file from storage: %s", err)
|
||||||
p.thumbstate = errored
|
atomic.StoreInt32(&p.thumbState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,20 +132,20 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error {
|
||||||
thumb, err := deriveThumbnail(stored, p.attachment.File.ContentType, createBlurhash)
|
thumb, err := deriveThumbnail(stored, p.attachment.File.ContentType, createBlurhash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = fmt.Errorf("loadThumb: error deriving thumbnail: %s", err)
|
p.err = fmt.Errorf("loadThumb: error deriving thumbnail: %s", err)
|
||||||
p.thumbstate = errored
|
atomic.StoreInt32(&p.thumbState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := stored.Close(); err != nil {
|
if err := stored.Close(); err != nil {
|
||||||
p.err = fmt.Errorf("loadThumb: error closing stored full size: %s", err)
|
p.err = fmt.Errorf("loadThumb: error closing stored full size: %s", err)
|
||||||
p.thumbstate = errored
|
atomic.StoreInt32(&p.thumbState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// put the thumbnail in storage
|
// put the thumbnail in storage
|
||||||
if err := p.storage.Put(p.attachment.Thumbnail.Path, thumb.small); err != nil {
|
if err := p.storage.Put(p.attachment.Thumbnail.Path, thumb.small); err != nil {
|
||||||
p.err = fmt.Errorf("loadThumb: error storing thumbnail: %s", err)
|
p.err = fmt.Errorf("loadThumb: error storing thumbnail: %s", err)
|
||||||
p.thumbstate = errored
|
atomic.StoreInt32(&p.thumbState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +162,7 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error {
|
||||||
p.attachment.Thumbnail.FileSize = len(thumb.small)
|
p.attachment.Thumbnail.FileSize = len(thumb.small)
|
||||||
|
|
||||||
// we're done processing the thumbnail!
|
// we're done processing the thumbnail!
|
||||||
p.thumbstate = complete
|
atomic.StoreInt32(&p.thumbState, int32(complete))
|
||||||
fallthrough
|
fallthrough
|
||||||
case complete:
|
case complete:
|
||||||
return nil
|
return nil
|
||||||
|
@ -168,11 +170,12 @@ func (p *ProcessingMedia) loadThumb(ctx context.Context) error {
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("loadThumb: thumbnail processing status %d unknown", p.thumbstate)
|
return fmt.Errorf("loadThumb: thumbnail processing status %d unknown", p.thumbState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProcessingMedia) loadFullSize(ctx context.Context) error {
|
func (p *ProcessingMedia) loadFullSize(ctx context.Context) error {
|
||||||
switch p.fullSizeState {
|
fullSizeState := atomic.LoadInt32(&p.fullSizeState)
|
||||||
|
switch processState(fullSizeState) {
|
||||||
case received:
|
case received:
|
||||||
var err error
|
var err error
|
||||||
var decoded *imageMeta
|
var decoded *imageMeta
|
||||||
|
@ -181,7 +184,7 @@ func (p *ProcessingMedia) loadFullSize(ctx context.Context) error {
|
||||||
stored, err := p.storage.GetStream(p.attachment.File.Path)
|
stored, err := p.storage.GetStream(p.attachment.File.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = fmt.Errorf("loadFullSize: error fetching file from storage: %s", err)
|
p.err = fmt.Errorf("loadFullSize: error fetching file from storage: %s", err)
|
||||||
p.fullSizeState = errored
|
atomic.StoreInt32(&p.fullSizeState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,13 +201,13 @@ func (p *ProcessingMedia) loadFullSize(ctx context.Context) error {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = err
|
p.err = err
|
||||||
p.fullSizeState = errored
|
atomic.StoreInt32(&p.fullSizeState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := stored.Close(); err != nil {
|
if err := stored.Close(); err != nil {
|
||||||
p.err = fmt.Errorf("loadFullSize: error closing stored full size: %s", err)
|
p.err = fmt.Errorf("loadFullSize: error closing stored full size: %s", err)
|
||||||
p.thumbstate = errored
|
atomic.StoreInt32(&p.fullSizeState, int32(errored))
|
||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,7 +222,7 @@ func (p *ProcessingMedia) loadFullSize(ctx context.Context) error {
|
||||||
p.attachment.Processing = gtsmodel.ProcessingStatusProcessed
|
p.attachment.Processing = gtsmodel.ProcessingStatusProcessed
|
||||||
|
|
||||||
// we're done processing the full-size image
|
// we're done processing the full-size image
|
||||||
p.fullSizeState = complete
|
atomic.StoreInt32(&p.fullSizeState, int32(complete))
|
||||||
fallthrough
|
fallthrough
|
||||||
case complete:
|
case complete:
|
||||||
return nil
|
return nil
|
||||||
|
@ -400,8 +403,8 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID
|
||||||
processingMedia := &ProcessingMedia{
|
processingMedia := &ProcessingMedia{
|
||||||
attachment: attachment,
|
attachment: attachment,
|
||||||
data: data,
|
data: data,
|
||||||
thumbstate: received,
|
thumbState: int32(received),
|
||||||
fullSizeState: received,
|
fullSizeState: int32(received),
|
||||||
database: m.db,
|
database: m.db,
|
||||||
storage: m.storage,
|
storage: m.storage,
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ const (
|
||||||
mimeImagePng = mimeImage + "/" + mimePng
|
mimeImagePng = mimeImage + "/" + mimePng
|
||||||
)
|
)
|
||||||
|
|
||||||
type processState int
|
type processState int32
|
||||||
|
|
||||||
const (
|
const (
|
||||||
received processState = iota // processing order has been received but not done yet
|
received processState = iota // processing order has been received but not done yet
|
||||||
|
|
Loading…
Reference in a new issue