[feature] Add postDataCallbackFunc to allow cleanup (#408)

This commit is contained in:
tobi 2022-02-22 13:50:33 +01:00 committed by GitHub
parent 15d1e6b3a1
commit 55b83bea7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 146 additions and 24 deletions

View file

@ -285,7 +285,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm
} }
avatar := true avatar := true
newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, targetAccount.ID, &media.AdditionalMediaInfo{ newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccount.ID, &media.AdditionalMediaInfo{
RemoteURL: &targetAccount.AvatarRemoteURL, RemoteURL: &targetAccount.AvatarRemoteURL,
Avatar: &avatar, Avatar: &avatar,
}) })
@ -343,7 +343,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm
} }
header := true header := true
newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, targetAccount.ID, &media.AdditionalMediaInfo{ newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccount.ID, &media.AdditionalMediaInfo{
RemoteURL: &targetAccount.HeaderRemoteURL, RemoteURL: &targetAccount.HeaderRemoteURL,
Header: &header, Header: &header,
}) })

View file

@ -46,7 +46,7 @@ func (d *deref) GetRemoteMedia(ctx context.Context, requestingUsername string, a
return t.DereferenceMedia(innerCtx, derefURI) return t.DereferenceMedia(innerCtx, derefURI)
} }
processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai) processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
if err != nil { if err != nil {
return nil, fmt.Errorf("GetRemoteMedia: error processing attachment: %s", err) return nil, fmt.Errorf("GetRemoteMedia: error processing attachment: %s", err)
} }

View file

@ -32,16 +32,35 @@ import (
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface { type Manager interface {
// ProcessMedia begins the process of decoding and storing the given data as an attachment. // ProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a Media struct upon which further actions can be performed, such as getting // It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc. // the finished media, thumbnail, attachment, etc.
// //
// data should be a function that the media manager can call to return raw bytes of a piece of media. // data should be a function that the media manager can call to return a reader containing the media data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
// //
// accountID should be the account that the media belongs to. // accountID should be the account that the media belongs to.
// //
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database. // ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) // ProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the emoji data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// shortcode should be the emoji shortcode without the ':'s around it.
//
// id is the database ID that should be used to store the emoji.
//
// uri is the ActivityPub URI/ID of the emoji.
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
// NumWorkers returns the total number of workers available to this manager. // NumWorkers returns the total number of workers available to this manager.
NumWorkers() int NumWorkers() int
// QueueSize returns the total capacity of the queue. // QueueSize returns the total capacity of the queue.
@ -101,8 +120,8 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
return m, nil return m, nil
} }
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
processingMedia, err := m.preProcessMedia(ctx, data, accountID, ai) processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -125,8 +144,8 @@ func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID str
return processingMedia, nil return processingMedia, nil
} }
func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
processingEmoji, err := m.preProcessEmoji(ctx, data, shortcode, id, uri, ai) processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -54,7 +54,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264" accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided // process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil) processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err) suite.NoError(err)
// fetch the attachment id from the processing media // fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID() attachmentID := processingMedia.AttachmentID()
@ -111,6 +111,89 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() {
suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes) suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes)
} }
func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() {
ctx := context.Background()
data := func(_ context.Context) (io.Reader, int, error) {
// load bytes from a test image
b, err := os.ReadFile("./test/test-jpeg.jpg")
if err != nil {
panic(err)
}
return bytes.NewBuffer(b), len(b), nil
}
// test the callback function by setting a simple boolean
var calledPostData bool
postData := func(_ context.Context) error {
calledPostData = true
return nil
}
suite.False(calledPostData) // not called yet (obvs)
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, postData, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
// do a blocking call to fetch the attachment
attachment, err := processingMedia.LoadAttachment(ctx)
suite.NoError(err)
suite.NotNil(attachment)
// the post data callback should have been called
suite.True(calledPostData)
// make sure it's got the stuff set on it that we expect
// the attachment ID and accountID we expect
suite.Equal(attachmentID, attachment.ID)
suite.Equal(accountID, attachment.AccountID)
// file meta should be correctly derived from the image
suite.EqualValues(gtsmodel.Original{
Width: 1920, Height: 1080, Size: 2073600, Aspect: 1.7777777777777777,
}, attachment.FileMeta.Original)
suite.EqualValues(gtsmodel.Small{
Width: 512, Height: 288, Size: 147456, Aspect: 1.7777777777777777,
}, attachment.FileMeta.Small)
suite.Equal("image/jpeg", attachment.File.ContentType)
suite.Equal("image/jpeg", attachment.Thumbnail.ContentType)
suite.Equal(269739, attachment.File.FileSize)
suite.Equal("LjBzUo#6RQR._NvzRjWF?urqV@a$", attachment.Blurhash)
// now make sure the attachment is in the database
dbAttachment, err := suite.db.GetAttachmentByID(ctx, attachmentID)
suite.NoError(err)
suite.NotNil(dbAttachment)
// make sure the processed file is in storage
processedFullBytes, err := suite.storage.Get(attachment.File.Path)
suite.NoError(err)
suite.NotEmpty(processedFullBytes)
// load the processed bytes from our test folder, to compare
processedFullBytesExpected, err := os.ReadFile("./test/test-jpeg-processed.jpg")
suite.NoError(err)
suite.NotEmpty(processedFullBytesExpected)
// the bytes in storage should be what we expected
suite.Equal(processedFullBytesExpected, processedFullBytes)
// now do the same for the thumbnail and make sure it's what we expected
processedThumbnailBytes, err := suite.storage.Get(attachment.Thumbnail.Path)
suite.NoError(err)
suite.NotEmpty(processedThumbnailBytes)
processedThumbnailBytesExpected, err := os.ReadFile("./test/test-jpeg-thumbnail.jpg")
suite.NoError(err)
suite.NotEmpty(processedThumbnailBytesExpected)
suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes)
}
func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() { func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() {
ctx := context.Background() ctx := context.Background()
@ -126,7 +209,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264" accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided // process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil) processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err) suite.NoError(err)
// fetch the attachment id from the processing media // fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID() attachmentID := processingMedia.AttachmentID()
@ -210,7 +293,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegQueueSpamming() {
inProcess := []*media.ProcessingMedia{} inProcess := []*media.ProcessingMedia{}
for i := 0; i < spam; i++ { for i := 0; i < spam; i++ {
// process the media with no additional info provided // process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil) processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err) suite.NoError(err)
inProcess = append(inProcess, processingMedia) inProcess = append(inProcess, processingMedia)
} }
@ -305,7 +388,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
suite.manager = diskManager suite.manager = diskManager
// process the media with no additional info provided // process the media with no additional info provided
processingMedia, err := diskManager.ProcessMedia(ctx, data, accountID, nil) processingMedia, err := diskManager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err) suite.NoError(err)
// fetch the attachment id from the processing media // fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID() attachmentID := processingMedia.AttachmentID()

View file

@ -47,9 +47,10 @@ type ProcessingEmoji struct {
emoji will be updated incrementally as media goes through processing emoji will be updated incrementally as media goes through processing
*/ */
emoji *gtsmodel.Emoji emoji *gtsmodel.Emoji
data DataFunc data DataFunc
read bool // bool indicating that data function has been triggered already postData PostDataCallbackFunc
read bool // bool indicating that data function has been triggered already
/* /*
below fields represent the processing state of the static of the emoji below fields represent the processing state of the static of the emoji
@ -212,10 +213,15 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
} }
p.read = true p.read = true
if p.postData != nil {
return p.postData(ctx)
}
return nil return nil
} }
func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
instanceAccount, err := m.db.GetInstanceAccount(ctx, "") instanceAccount, err := m.db.GetInstanceAccount(ctx, "")
if err != nil { if err != nil {
return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err)
@ -281,6 +287,7 @@ func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode
instanceAccountID: instanceAccount.ID, instanceAccountID: instanceAccount.ID,
emoji: emoji, emoji: emoji,
data: data, data: data,
postData: postData,
staticState: int32(received), staticState: int32(received),
database: m.db, database: m.db,
storage: m.storage, storage: m.storage,

View file

@ -48,6 +48,7 @@ type ProcessingMedia struct {
attachment *gtsmodel.MediaAttachment attachment *gtsmodel.MediaAttachment
data DataFunc data DataFunc
postData PostDataCallbackFunc
read bool // bool indicating that data function has been triggered already read bool // bool indicating that data function has been triggered already
thumbState int32 // the processing state of the media thumbnail thumbState int32 // the processing state of the media thumbnail
@ -313,10 +314,15 @@ func (p *ProcessingMedia) store(ctx context.Context) error {
} }
p.read = true p.read = true
if p.postData != nil {
return p.postData(ctx)
}
return nil return nil
} }
func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
id, err := id.NewRandomULID() id, err := id.NewRandomULID()
if err != nil { if err != nil {
return nil, err return nil, err
@ -403,6 +409,7 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID
processingMedia := &ProcessingMedia{ processingMedia := &ProcessingMedia{
attachment: attachment, attachment: attachment,
data: data, data: data,
postData: postData,
thumbState: int32(received), thumbState: int32(received),
fullSizeState: int32(received), fullSizeState: int32(received),
database: m.db, database: m.db,

View file

@ -119,3 +119,9 @@ type AdditionalEmojiInfo struct {
// DataFunc represents a function used to retrieve the raw bytes of a piece of media. // DataFunc represents a function used to retrieve the raw bytes of a piece of media.
type DataFunc func(ctx context.Context) (reader io.Reader, fileSize int, err error) type DataFunc func(ctx context.Context) (reader io.Reader, fileSize int, err error)
// PostDataCallbackFunc represents a function executed after the DataFunc has been executed,
// and the returned reader has been read. It can be used to clean up any remaining resources.
//
// This can be set to nil, and will then not be executed.
type PostDataCallbackFunc func(ctx context.Context) error

View file

@ -150,7 +150,7 @@ func (p *processor) UpdateAvatar(ctx context.Context, avatar *multipart.FileHead
Avatar: &isAvatar, Avatar: &isAvatar,
} }
processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai) processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
if err != nil { if err != nil {
return nil, fmt.Errorf("UpdateAvatar: error processing avatar: %s", err) return nil, fmt.Errorf("UpdateAvatar: error processing avatar: %s", err)
} }
@ -177,7 +177,7 @@ func (p *processor) UpdateHeader(ctx context.Context, header *multipart.FileHead
Header: &isHeader, Header: &isHeader,
} }
processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai) processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
if err != nil { if err != nil {
return nil, fmt.Errorf("UpdateHeader: error processing header: %s", err) return nil, fmt.Errorf("UpdateHeader: error processing header: %s", err)
} }

View file

@ -49,7 +49,7 @@ func (p *processor) EmojiCreate(ctx context.Context, account *gtsmodel.Account,
emojiURI := uris.GenerateURIForEmoji(emojiID) emojiURI := uris.GenerateURIForEmoji(emojiID)
processingEmoji, err := p.mediaManager.ProcessEmoji(ctx, data, form.Shortcode, emojiID, emojiURI, nil) processingEmoji, err := p.mediaManager.ProcessEmoji(ctx, data, nil, form.Shortcode, emojiID, emojiURI, nil)
if err != nil { if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error processing emoji: %s", err), "error processing emoji") return nil, gtserror.NewErrorInternalError(fmt.Errorf("error processing emoji: %s", err), "error processing emoji")
} }

View file

@ -40,7 +40,7 @@ func (p *processor) Create(ctx context.Context, account *gtsmodel.Account, form
} }
// process the media attachment and load it immediately // process the media attachment and load it immediately
media, err := p.mediaManager.ProcessMedia(ctx, data, account.ID, &media.AdditionalMediaInfo{ media, err := p.mediaManager.ProcessMedia(ctx, data, nil, account.ID, &media.AdditionalMediaInfo{
Description: &form.Description, Description: &form.Description,
FocusX: &focusX, FocusX: &focusX,
FocusY: &focusY, FocusY: &focusY,