From 55b83bea7cad893aabda4ab5e05c248fb66e1e51 Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Tue, 22 Feb 2022 13:50:33 +0100 Subject: [PATCH] [feature] Add postDataCallbackFunc to allow cleanup (#408) --- internal/federation/dereferencing/account.go | 4 +- internal/federation/dereferencing/media.go | 2 +- internal/media/manager.go | 35 ++++++-- internal/media/manager_test.go | 91 +++++++++++++++++++- internal/media/processingemoji.go | 15 +++- internal/media/processingmedia.go | 9 +- internal/media/types.go | 6 ++ internal/processing/account/update.go | 4 +- internal/processing/admin/emoji.go | 2 +- internal/processing/media/create.go | 2 +- 10 files changed, 146 insertions(+), 24 deletions(-) diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 02afd9a9c..7d5d80479 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -285,7 +285,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm } avatar := true - newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, targetAccount.ID, &media.AdditionalMediaInfo{ + newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccount.ID, &media.AdditionalMediaInfo{ RemoteURL: &targetAccount.AvatarRemoteURL, Avatar: &avatar, }) @@ -343,7 +343,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm } header := true - newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, targetAccount.ID, &media.AdditionalMediaInfo{ + newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccount.ID, &media.AdditionalMediaInfo{ RemoteURL: &targetAccount.HeaderRemoteURL, Header: &header, }) diff --git a/internal/federation/dereferencing/media.go b/internal/federation/dereferencing/media.go index 0b19570f2..05fd70589 100644 --- a/internal/federation/dereferencing/media.go +++ b/internal/federation/dereferencing/media.go @@ -46,7 +46,7 @@ func (d *deref) GetRemoteMedia(ctx context.Context, requestingUsername string, a return t.DereferenceMedia(innerCtx, derefURI) } - processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai) + processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai) if err != nil { return nil, fmt.Errorf("GetRemoteMedia: error processing attachment: %s", err) } diff --git a/internal/media/manager.go b/internal/media/manager.go index 7f626271a..3901bae00 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -32,16 +32,35 @@ import ( // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. type Manager interface { // ProcessMedia begins the process of decoding and storing the given data as an attachment. - // It will return a pointer to a Media struct upon which further actions can be performed, such as getting + // It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting // the finished media, thumbnail, attachment, etc. // - // data should be a function that the media manager can call to return raw bytes of a piece of media. + // data should be a function that the media manager can call to return a reader containing the media data. + // + // postData will be called after data has been called; it can be used to clean up any remaining resources. + // The provided function can be nil, in which case it will not be executed. // // accountID should be the account that the media belongs to. // // ai is optional and can be nil. Any additional information about the attachment provided will be put in the database. - ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) - ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) + ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) + // ProcessEmoji begins the process of decoding and storing the given data as an emoji. + // It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting + // the finished media, thumbnail, attachment, etc. + // + // data should be a function that the media manager can call to return a reader containing the emoji data. + // + // postData will be called after data has been called; it can be used to clean up any remaining resources. + // The provided function can be nil, in which case it will not be executed. + // + // shortcode should be the emoji shortcode without the ':'s around it. + // + // id is the database ID that should be used to store the emoji. + // + // uri is the ActivityPub URI/ID of the emoji. + // + // ai is optional and can be nil. Any additional information about the emoji provided will be put in the database. + ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) // NumWorkers returns the total number of workers available to this manager. NumWorkers() int // QueueSize returns the total capacity of the queue. @@ -101,8 +120,8 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { return m, nil } -func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { - processingMedia, err := m.preProcessMedia(ctx, data, accountID, ai) +func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { + processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai) if err != nil { return nil, err } @@ -125,8 +144,8 @@ func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID str return processingMedia, nil } -func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { - processingEmoji, err := m.preProcessEmoji(ctx, data, shortcode, id, uri, ai) +func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { + processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai) if err != nil { return nil, err } diff --git a/internal/media/manager_test.go b/internal/media/manager_test.go index 8443f825e..a962c2a44 100644 --- a/internal/media/manager_test.go +++ b/internal/media/manager_test.go @@ -54,7 +54,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() { accountID := "01FS1X72SK9ZPW0J1QQ68BD264" // process the media with no additional info provided - processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil) + processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil) suite.NoError(err) // fetch the attachment id from the processing media attachmentID := processingMedia.AttachmentID() @@ -111,6 +111,89 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() { suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes) } +func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() { + ctx := context.Background() + + data := func(_ context.Context) (io.Reader, int, error) { + // load bytes from a test image + b, err := os.ReadFile("./test/test-jpeg.jpg") + if err != nil { + panic(err) + } + return bytes.NewBuffer(b), len(b), nil + } + + // test the callback function by setting a simple boolean + var calledPostData bool + postData := func(_ context.Context) error { + calledPostData = true + return nil + } + suite.False(calledPostData) // not called yet (obvs) + + accountID := "01FS1X72SK9ZPW0J1QQ68BD264" + + // process the media with no additional info provided + processingMedia, err := suite.manager.ProcessMedia(ctx, data, postData, accountID, nil) + suite.NoError(err) + // fetch the attachment id from the processing media + attachmentID := processingMedia.AttachmentID() + + // do a blocking call to fetch the attachment + attachment, err := processingMedia.LoadAttachment(ctx) + suite.NoError(err) + suite.NotNil(attachment) + + // the post data callback should have been called + suite.True(calledPostData) + + // make sure it's got the stuff set on it that we expect + // the attachment ID and accountID we expect + suite.Equal(attachmentID, attachment.ID) + suite.Equal(accountID, attachment.AccountID) + + // file meta should be correctly derived from the image + suite.EqualValues(gtsmodel.Original{ + Width: 1920, Height: 1080, Size: 2073600, Aspect: 1.7777777777777777, + }, attachment.FileMeta.Original) + suite.EqualValues(gtsmodel.Small{ + Width: 512, Height: 288, Size: 147456, Aspect: 1.7777777777777777, + }, attachment.FileMeta.Small) + suite.Equal("image/jpeg", attachment.File.ContentType) + suite.Equal("image/jpeg", attachment.Thumbnail.ContentType) + suite.Equal(269739, attachment.File.FileSize) + suite.Equal("LjBzUo#6RQR._NvzRjWF?urqV@a$", attachment.Blurhash) + + // now make sure the attachment is in the database + dbAttachment, err := suite.db.GetAttachmentByID(ctx, attachmentID) + suite.NoError(err) + suite.NotNil(dbAttachment) + + // make sure the processed file is in storage + processedFullBytes, err := suite.storage.Get(attachment.File.Path) + suite.NoError(err) + suite.NotEmpty(processedFullBytes) + + // load the processed bytes from our test folder, to compare + processedFullBytesExpected, err := os.ReadFile("./test/test-jpeg-processed.jpg") + suite.NoError(err) + suite.NotEmpty(processedFullBytesExpected) + + // the bytes in storage should be what we expected + suite.Equal(processedFullBytesExpected, processedFullBytes) + + // now do the same for the thumbnail and make sure it's what we expected + processedThumbnailBytes, err := suite.storage.Get(attachment.Thumbnail.Path) + suite.NoError(err) + suite.NotEmpty(processedThumbnailBytes) + + processedThumbnailBytesExpected, err := os.ReadFile("./test/test-jpeg-thumbnail.jpg") + suite.NoError(err) + suite.NotEmpty(processedThumbnailBytesExpected) + + suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes) +} + func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() { ctx := context.Background() @@ -126,7 +209,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() { accountID := "01FS1X72SK9ZPW0J1QQ68BD264" // process the media with no additional info provided - processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil) + processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil) suite.NoError(err) // fetch the attachment id from the processing media attachmentID := processingMedia.AttachmentID() @@ -210,7 +293,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegQueueSpamming() { inProcess := []*media.ProcessingMedia{} for i := 0; i < spam; i++ { // process the media with no additional info provided - processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil) + processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil) suite.NoError(err) inProcess = append(inProcess, processingMedia) } @@ -305,7 +388,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() { suite.manager = diskManager // process the media with no additional info provided - processingMedia, err := diskManager.ProcessMedia(ctx, data, accountID, nil) + processingMedia, err := diskManager.ProcessMedia(ctx, data, nil, accountID, nil) suite.NoError(err) // fetch the attachment id from the processing media attachmentID := processingMedia.AttachmentID() diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index 741854b9b..2a3cd1613 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -47,9 +47,10 @@ type ProcessingEmoji struct { emoji will be updated incrementally as media goes through processing */ - emoji *gtsmodel.Emoji - data DataFunc - read bool // bool indicating that data function has been triggered already + emoji *gtsmodel.Emoji + data DataFunc + postData PostDataCallbackFunc + read bool // bool indicating that data function has been triggered already /* below fields represent the processing state of the static of the emoji @@ -212,10 +213,15 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { } p.read = true + + if p.postData != nil { + return p.postData(ctx) + } + return nil } -func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { +func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { instanceAccount, err := m.db.GetInstanceAccount(ctx, "") if err != nil { return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) @@ -281,6 +287,7 @@ func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode instanceAccountID: instanceAccount.ID, emoji: emoji, data: data, + postData: postData, staticState: int32(received), database: m.db, storage: m.storage, diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 2df5fe584..5d5cfd249 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -48,6 +48,7 @@ type ProcessingMedia struct { attachment *gtsmodel.MediaAttachment data DataFunc + postData PostDataCallbackFunc read bool // bool indicating that data function has been triggered already thumbState int32 // the processing state of the media thumbnail @@ -313,10 +314,15 @@ func (p *ProcessingMedia) store(ctx context.Context) error { } p.read = true + + if p.postData != nil { + return p.postData(ctx) + } + return nil } -func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { +func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { id, err := id.NewRandomULID() if err != nil { return nil, err @@ -403,6 +409,7 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID processingMedia := &ProcessingMedia{ attachment: attachment, data: data, + postData: postData, thumbState: int32(received), fullSizeState: int32(received), database: m.db, diff --git a/internal/media/types.go b/internal/media/types.go index a6b38b467..cceff216c 100644 --- a/internal/media/types.go +++ b/internal/media/types.go @@ -119,3 +119,9 @@ type AdditionalEmojiInfo struct { // DataFunc represents a function used to retrieve the raw bytes of a piece of media. type DataFunc func(ctx context.Context) (reader io.Reader, fileSize int, err error) + +// PostDataCallbackFunc represents a function executed after the DataFunc has been executed, +// and the returned reader has been read. It can be used to clean up any remaining resources. +// +// This can be set to nil, and will then not be executed. +type PostDataCallbackFunc func(ctx context.Context) error diff --git a/internal/processing/account/update.go b/internal/processing/account/update.go index 4d10f1d0c..a96b44bef 100644 --- a/internal/processing/account/update.go +++ b/internal/processing/account/update.go @@ -150,7 +150,7 @@ func (p *processor) UpdateAvatar(ctx context.Context, avatar *multipart.FileHead Avatar: &isAvatar, } - processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai) + processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai) if err != nil { return nil, fmt.Errorf("UpdateAvatar: error processing avatar: %s", err) } @@ -177,7 +177,7 @@ func (p *processor) UpdateHeader(ctx context.Context, header *multipart.FileHead Header: &isHeader, } - processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai) + processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai) if err != nil { return nil, fmt.Errorf("UpdateHeader: error processing header: %s", err) } diff --git a/internal/processing/admin/emoji.go b/internal/processing/admin/emoji.go index 6ef78aa65..78920273c 100644 --- a/internal/processing/admin/emoji.go +++ b/internal/processing/admin/emoji.go @@ -49,7 +49,7 @@ func (p *processor) EmojiCreate(ctx context.Context, account *gtsmodel.Account, emojiURI := uris.GenerateURIForEmoji(emojiID) - processingEmoji, err := p.mediaManager.ProcessEmoji(ctx, data, form.Shortcode, emojiID, emojiURI, nil) + processingEmoji, err := p.mediaManager.ProcessEmoji(ctx, data, nil, form.Shortcode, emojiID, emojiURI, nil) if err != nil { return nil, gtserror.NewErrorInternalError(fmt.Errorf("error processing emoji: %s", err), "error processing emoji") } diff --git a/internal/processing/media/create.go b/internal/processing/media/create.go index 4047278eb..1eb9fa512 100644 --- a/internal/processing/media/create.go +++ b/internal/processing/media/create.go @@ -40,7 +40,7 @@ func (p *processor) Create(ctx context.Context, account *gtsmodel.Account, form } // process the media attachment and load it immediately - media, err := p.mediaManager.ProcessMedia(ctx, data, account.ID, &media.AdditionalMediaInfo{ + media, err := p.mediaManager.ProcessMedia(ctx, data, nil, account.ID, &media.AdditionalMediaInfo{ Description: &form.Description, FocusX: &focusX, FocusY: &focusY,