diff --git a/internal/media/image.go b/internal/media/image.go index 26bd5e8b6..4ad68db5a 100644 --- a/internal/media/image.go +++ b/internal/media/image.go @@ -43,21 +43,55 @@ const ( thumbnailMaxHeight = 512 ) -type imageAndMeta struct { - image []byte - width int - height int - size int - aspect float64 - blurhash string +type ImageMeta struct { + image []byte + contentType string + width int + height int + size int + aspect float64 + blurhash string } -func (m *manager) processImage(ctx context.Context, data []byte, contentType string, accountID string) { +func (m *manager) preProcessImage(ctx context.Context, data []byte, contentType string, accountID string) (*Media, error) { + id, err := id.NewRandomULID() + if err != nil { + return nil, err + } + + extension := strings.Split(contentType, "/")[1] + + attachment := >smodel.MediaAttachment{ + ID: id, + UpdatedAt: time.Now(), + URL: uris.GenerateURIForAttachment(accountID, string(TypeAttachment), string(SizeOriginal), id, extension), + Type: gtsmodel.FileTypeImage, + AccountID: accountID, + Processing: 0, + File: gtsmodel.File{ + Path: fmt.Sprintf("%s/%s/%s/%s.%s", accountID, TypeAttachment, SizeOriginal, id, extension), + ContentType: contentType, + UpdatedAt: time.Now(), + }, + Thumbnail: gtsmodel.Thumbnail{ + URL: uris.GenerateURIForAttachment(accountID, string(TypeAttachment), string(SizeSmall), id, mimeJpeg), // all thumbnails are encoded as jpeg, + Path: fmt.Sprintf("%s/%s/%s/%s.%s", accountID, TypeAttachment, SizeSmall, id, mimeJpeg), // all thumbnails are encoded as jpeg, + ContentType: mimeJpeg, + UpdatedAt: time.Now(), + }, + Avatar: false, + Header: false, + } + + media := &Media{ + attachment: attachment, + } + + return media, nil var clean []byte - var err error - var original *imageAndMeta - var small *imageAndMeta + var original *ImageMeta + var small *ImageMeta switch contentType { case mimeImageJpeg, mimeImagePng: @@ -79,82 +113,17 @@ func (m *manager) processImage(ctx context.Context, data []byte, contentType str return nil, err } - small, err = deriveThumbnail(clean, contentType, thumbnailMaxWidth, thumbnailMaxHeight) + small, err = deriveThumbnail(clean, contentType) if err != nil { return nil, fmt.Errorf("error deriving thumbnail: %s", err) } // now put it in storage, take a new id for the name of the file so we don't store any unnecessary info about it - extension := strings.Split(contentType, "/")[1] - attachmentID, err := id.NewRandomULID() - if err != nil { - return nil, err - } - - originalURL := uris.GenerateURIForAttachment(accountID, string(TypeAttachment), string(SizeOriginal), attachmentID, extension) - smallURL := uris.GenerateURIForAttachment(accountID, string(TypeAttachment), string(SizeSmall), attachmentID, "jpeg") // all thumbnails/smalls are encoded as jpeg - - // we store the original... - originalPath := fmt.Sprintf("%s/%s/%s/%s.%s", accountID, TypeAttachment, SizeOriginal, attachmentID, extension) - if err := m.storage.Put(originalPath, original.image); err != nil { - return nil, fmt.Errorf("storage error: %s", err) - } - - // and a thumbnail... - smallPath := fmt.Sprintf("%s/%s/%s/%s.jpeg", accountID, TypeAttachment, SizeSmall, attachmentID) // all thumbnails/smalls are encoded as jpeg - if err := m.storage.Put(smallPath, small.image); err != nil { - return nil, fmt.Errorf("storage error: %s", err) - } - - attachment := >smodel.MediaAttachment{ - ID: attachmentID, - StatusID: "", - URL: originalURL, - RemoteURL: "", - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, - Type: gtsmodel.FileTypeImage, - FileMeta: gtsmodel.FileMeta{ - Original: gtsmodel.Original{ - Width: original.width, - Height: original.height, - Size: original.size, - Aspect: original.aspect, - }, - Small: gtsmodel.Small{ - Width: small.width, - Height: small.height, - Size: small.size, - Aspect: small.aspect, - }, - }, - AccountID: accountID, - Description: "", - ScheduledStatusID: "", - Blurhash: small.blurhash, - Processing: 2, - File: gtsmodel.File{ - Path: originalPath, - ContentType: contentType, - FileSize: len(original.image), - UpdatedAt: time.Now(), - }, - Thumbnail: gtsmodel.Thumbnail{ - Path: smallPath, - ContentType: mimeJpeg, // all thumbnails/smalls are encoded as jpeg - FileSize: len(small.image), - UpdatedAt: time.Now(), - URL: smallURL, - RemoteURL: "", - }, - Avatar: false, - Header: false, - } return attachment, nil } -func decodeGif(b []byte) (*imageAndMeta, error) { +func decodeGif(b []byte) (*ImageMeta, error) { gif, err := gif.DecodeAll(bytes.NewReader(b)) if err != nil { return nil, err @@ -166,7 +135,7 @@ func decodeGif(b []byte) (*imageAndMeta, error) { size := width * height aspect := float64(width) / float64(height) - return &imageAndMeta{ + return &ImageMeta{ image: b, width: width, height: height, @@ -175,7 +144,7 @@ func decodeGif(b []byte) (*imageAndMeta, error) { }, nil } -func decodeImage(b []byte, contentType string) (*imageAndMeta, error) { +func decodeImage(b []byte, contentType string) (*ImageMeta, error) { var i image.Image var err error @@ -201,7 +170,7 @@ func decodeImage(b []byte, contentType string) (*imageAndMeta, error) { size := width * height aspect := float64(width) / float64(height) - return &imageAndMeta{ + return &ImageMeta{ image: b, width: width, height: height, @@ -210,12 +179,12 @@ func decodeImage(b []byte, contentType string) (*imageAndMeta, error) { }, nil } -// deriveThumbnail returns a byte slice and metadata for a thumbnail of width x and height y, +// deriveThumbnail returns a byte slice and metadata for a thumbnail // of a given jpeg, png, or gif, or an error if something goes wrong. // // Note that the aspect ratio of the image will be retained, // so it will not necessarily be a square, even if x and y are set as the same value. -func deriveThumbnail(b []byte, contentType string, x uint, y uint) (*imageAndMeta, error) { +func deriveThumbnail(b []byte, contentType string) (*ImageMeta, error) { var i image.Image var err error @@ -239,7 +208,7 @@ func deriveThumbnail(b []byte, contentType string, x uint, y uint) (*imageAndMet return nil, fmt.Errorf("content type %s not recognised", contentType) } - thumb := resize.Thumbnail(x, y, i, resize.NearestNeighbor) + thumb := resize.Thumbnail(thumbnailMaxWidth, thumbnailMaxHeight, i, resize.NearestNeighbor) width := thumb.Bounds().Size().X height := thumb.Bounds().Size().Y size := width * height @@ -257,7 +226,7 @@ func deriveThumbnail(b []byte, contentType string, x uint, y uint) (*imageAndMet }); err != nil { return nil, err } - return &imageAndMeta{ + return &ImageMeta{ image: out.Bytes(), width: width, height: height, @@ -268,7 +237,7 @@ func deriveThumbnail(b []byte, contentType string, x uint, y uint) (*imageAndMet } // deriveStaticEmojji takes a given gif or png of an emoji, decodes it, and re-encodes it as a static png. -func deriveStaticEmoji(b []byte, contentType string) (*imageAndMeta, error) { +func deriveStaticEmoji(b []byte, contentType string) (*ImageMeta, error) { var i image.Image var err error @@ -291,7 +260,7 @@ func deriveStaticEmoji(b []byte, contentType string) (*imageAndMeta, error) { if err := png.Encode(out, i); err != nil { return nil, err } - return &imageAndMeta{ + return &ImageMeta{ image: out.Bytes(), }, nil } diff --git a/internal/media/manager.go b/internal/media/manager.go index 16465bb67..54b964564 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -25,7 +25,9 @@ import ( "runtime" "strings" + "codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-store/kv" + "github.com/sirupsen/logrus" "github.com/superseriousbusiness/gotosocial/internal/db" ) @@ -37,18 +39,27 @@ type Manager interface { type manager struct { db db.DB storage *kv.KVStore - pool *workerPool + pool runners.WorkerPool } // New returns a media manager with the given db and underlying storage. -func New(database db.DB, storage *kv.KVStore) Manager { +func New(database db.DB, storage *kv.KVStore) (Manager, error) { workers := runtime.NumCPU() / 2 + queue := workers * 10 + pool := runners.NewWorkerPool(workers, queue) - return &manager{ + if start := pool.Start(); !start { + return nil, errors.New("could not start worker pool") + } + logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", workers, queue) + + m := &manager{ db: database, storage: storage, - pool: newWorkerPool(workers), + pool: pool, } + + return m, nil } /* @@ -77,9 +88,16 @@ func (m *manager) ProcessMedia(ctx context.Context, data []byte, accountID strin return nil, errors.New("image was of size 0") } - return m.pool.run(func(ctx context.Context, data []byte, contentType string, accountID string) { - m.processImage(ctx, data, contentType, accountID) + media, err := m.preProcessImage(ctx, data, contentType, accountID) + if err != nil { + return nil, err + } + + m.pool.Enqueue(func(innerCtx context.Context) { + }) + + return nil, nil default: return nil, fmt.Errorf("content type %s not (yet) supported", contentType) } diff --git a/internal/media/media.go b/internal/media/media.go index e96c37020..0bd196b27 100644 --- a/internal/media/media.go +++ b/internal/media/media.go @@ -1,7 +1,34 @@ package media -import gtsmodel "github.com/superseriousbusiness/gotosocial/internal/db/bundb/migrations/20211113114307_init" +import ( + "fmt" + "sync" + + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) type Media struct { - Attachment *gtsmodel.MediaAttachment + mu sync.Mutex + attachment *gtsmodel.MediaAttachment + rawData []byte +} + +func (m *Media) Thumb() (*ImageMeta, error) { + m.mu.Lock() + thumb, err := deriveThumbnail(m.rawData, m.attachment.File.ContentType) + if err != nil { + return nil, fmt.Errorf("error deriving thumbnail: %s", err) + } + m.attachment.Blurhash = thumb.blurhash + aaaaaaaaaaaaaaaa +} + +func (m *Media) PreLoad() { + m.mu.Lock() + defer m.mu.Unlock() +} + +func (m *Media) Load() { + m.mu.Lock() + defer m.mu.Unlock() } diff --git a/internal/media/pool.go b/internal/media/pool.go deleted file mode 100644 index 19b31cde3..000000000 --- a/internal/media/pool.go +++ /dev/null @@ -1,65 +0,0 @@ -package media - -import "context" - -func newWorkerPool(workers int) *workerPool { - // make a pool with the given worker capacity - pool := &workerPool{ - workerQueue: make(chan *worker, workers), - } - - // fill the pool with workers - for i := 0; i < workers; i++ { - pool.workerQueue <- &worker{ - // give each worker a reference to the pool so it - // can put itself back in when it's finished - workerQueue: pool.workerQueue, - data: []byte{}, - contentType: "", - accountID: "", - } - } - - return pool -} - -type workerPool struct { - workerQueue chan *worker -} - -func (p *workerPool) run(fn func(ctx context.Context, data []byte, contentType string, accountID string)) (*Media, error) { - - m := &Media{} - - go func() { - // take a worker from the worker pool - worker := <-p.workerQueue - // tell it to work - worker.work(fn) - }() - - return m, nil -} - -type worker struct { - workerQueue chan *worker - data []byte - contentType string - accountID string -} - -func (w *worker) work(fn func(ctx context.Context, data []byte, contentType string, accountID string)) { - // return self to pool when finished - defer w.finish() - // do the work - fn(context.Background(), w.data, w.contentType, w.accountID) -} - -func (w *worker) finish() { - // clear self - w.data = []byte{} - w.contentType = "" - w.accountID = "" - // put self back in the worker pool - w.workerQueue <- w -} diff --git a/internal/media/process.go b/internal/media/process.go deleted file mode 100644 index e921be6bc..000000000 --- a/internal/media/process.go +++ /dev/null @@ -1,5 +0,0 @@ -package media - -import "context" - -type mediaProcessingFunction func(ctx context.Context, data []byte, contentType string, accountID string)