From a898160b0c8591be9677c158fa32d6a78d679152 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Tue, 22 Nov 2022 13:28:55 +0000 Subject: [PATCH] [chore] use kv.KVStore also for S3 storage (#1113) * replace s3 storage implementation to also use kv.KVStore Signed-off-by: kim * pull in latest `go-store` fix Signed-off-by: kim * pull-in go-store v2.0.9 fixes, update s3 put chunk size to 5MiB Signed-off-by: kim Signed-off-by: kim --- go.mod | 2 +- go.sum | 4 +- internal/storage/s3.go | 68 +++---------------- internal/storage/storage.go | 39 ++++++++--- testrig/storage.go | 32 +++++++-- .../gruf/go-store/v2/storage/s3.go | 25 ++++--- vendor/modules.txt | 2 +- 7 files changed, 85 insertions(+), 87 deletions(-) diff --git a/go.mod b/go.mod index 526f56a2f..3d5b058b5 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( codeberg.org/gruf/go-logger/v2 v2.2.1 codeberg.org/gruf/go-mutexes v1.1.4 codeberg.org/gruf/go-runners v1.3.1 - codeberg.org/gruf/go-store/v2 v2.0.7 + codeberg.org/gruf/go-store/v2 v2.0.9 github.com/buckket/go-blurhash v1.1.0 github.com/coreos/go-oidc/v3 v3.4.0 github.com/cornelk/hashmap v1.0.8 diff --git a/go.sum b/go.sum index e841f2b3f..db666f15f 100644 --- a/go.sum +++ b/go.sum @@ -99,8 +99,8 @@ codeberg.org/gruf/go-runners v1.3.1 h1:d/OQMMMiA6yPaDSbSr0/Jc+lucWmm7AiAZjWffpNK codeberg.org/gruf/go-runners v1.3.1/go.mod h1:rl0EdZNozkRMb21DAtOL5L4oTfmslYQdZgq2RMMc/H4= codeberg.org/gruf/go-sched v1.1.1 h1:YtLSQhpypzuD3HTup5oF7LLWB79gTL4nqW06kH4Vwks= codeberg.org/gruf/go-sched v1.1.1/go.mod h1:SRcdP/5qim+EBT3n3r4aUra1C30yPqV4OJOXuqvgdQM= -codeberg.org/gruf/go-store/v2 v2.0.7 h1:P+0d8jnXdgzxfHLqKjHMV+MAxVJmq056PvzaHRyR8jE= -codeberg.org/gruf/go-store/v2 v2.0.7/go.mod h1:D4r5PV0BXDhxQyATw/03JkwvziZDkVMgzTpElZyWTXI= +codeberg.org/gruf/go-store/v2 v2.0.9 h1:nFurtZCBdZac79aXo3PFPVn2N744B06dHLygVrHf+SA= +codeberg.org/gruf/go-store/v2 v2.0.9/go.mod h1:D4r5PV0BXDhxQyATw/03JkwvziZDkVMgzTpElZyWTXI= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/internal/storage/s3.go b/internal/storage/s3.go index 1b191f675..1ead7efe9 100644 --- a/internal/storage/s3.go +++ b/internal/storage/s3.go @@ -19,80 +19,32 @@ package storage import ( - "bytes" "context" - "fmt" - "io" "mime" "net/url" "path" "time" - "github.com/minio/minio-go/v7" + "codeberg.org/gruf/go-store/v2/kv" + "codeberg.org/gruf/go-store/v2/storage" ) type S3 struct { - mc *minio.Client - bucket string - proxy bool -} - -func NewS3(mc *minio.Client, bucket string, proxy bool) *S3 { - return &S3{ - mc: mc, - bucket: bucket, - proxy: proxy, - } -} - -func (s *S3) Get(ctx context.Context, key string) ([]byte, error) { - r, err := s.GetStream(ctx, key) - if err != nil { - return nil, err - } - defer r.Close() - b, err := io.ReadAll(r) - if err != nil { - return nil, fmt.Errorf("reading data from s3: %w", err) - } - return b, nil -} - -func (s *S3) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { - o, err := s.mc.GetObject(ctx, s.bucket, key, minio.GetObjectOptions{}) - if err != nil { - err = fmt.Errorf("retrieving object from s3: %w", err) - } - return o, err -} - -func (s *S3) PutStream(ctx context.Context, key string, r io.Reader) error { - if _, err := s.mc.PutObject(ctx, s.bucket, key, r, -1, minio.PutObjectOptions{}); err != nil { - return fmt.Errorf("uploading data stream: %w", err) - } - return nil -} - -func (s *S3) Put(ctx context.Context, key string, value []byte) error { - if _, err := s.mc.PutObject(ctx, s.bucket, key, bytes.NewBuffer(value), -1, minio.PutObjectOptions{}); err != nil { - return fmt.Errorf("uploading data slice: %w", err) - } - return nil -} - -func (s *S3) Delete(ctx context.Context, key string) error { - return s.mc.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{}) + Proxy bool + Bucket string + Storage *storage.S3Storage + *kv.KVStore } func (s *S3) URL(ctx context.Context, key string) *url.URL { - if s.proxy { + if s.Proxy { return nil } - // it's safe to ignore the error here, as we just fall back to fetching the - // file if the url request fails - url, _ := s.mc.PresignedGetObject(ctx, s.bucket, key, time.Hour, url.Values{ + // it's safe to ignore the error here, as we just fall back to fetching the file if URL request fails + url, _ := s.Storage.Client().PresignedGetObject(ctx, s.Bucket, key, time.Hour, url.Values{ "response-content-type": []string{mime.TypeByExtension(path.Ext(key))}, }) + return url } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 303283f90..71d4774f7 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -52,20 +52,38 @@ type Driver interface { func AutoConfig() (Driver, error) { switch config.GetStorageBackend() { case "s3": - mc, err := minio.New(config.GetStorageS3Endpoint(), &minio.Options{ - Creds: credentials.NewStaticV4(config.GetStorageS3AccessKey(), config.GetStorageS3SecretKey(), ""), - Secure: config.GetStorageS3UseSSL(), + endpoint := config.GetStorageS3Endpoint() + access := config.GetStorageS3AccessKey() + secret := config.GetStorageS3SecretKey() + secure := config.GetStorageS3UseSSL() + bucket := config.GetStorageS3BucketName() + proxy := config.GetStorageS3Proxy() + + s3, err := storage.OpenS3(endpoint, bucket, &storage.S3Config{ + CoreOpts: minio.Options{ + Creds: credentials.NewStaticV4(access, secret, ""), + Secure: secure, + }, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkSize: 5 * 1024 * 1024, // 5MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, }) if err != nil { - return nil, fmt.Errorf("creating minio client: %w", err) + return nil, fmt.Errorf("error opening s3 storage: %w", err) } - return NewS3( - mc, - config.GetStorageS3BucketName(), - config.GetStorageS3Proxy(), - ), nil + + return &S3{ + Proxy: proxy, + Bucket: bucket, + Storage: s3, + KVStore: kv.New(s3), + }, nil case "local": basePath := config.GetStorageLocalBasePath() + disk, err := storage.OpenDisk(basePath, &storage.DiskConfig{ // Put the store lockfile in the storage dir itself. // Normally this would not be safe, since we could end up @@ -75,8 +93,9 @@ func AutoConfig() (Driver, error) { LockFile: path.Join(basePath, "store.lock"), }) if err != nil { - return nil, fmt.Errorf("error openingdisk storage: %v", err) + return nil, fmt.Errorf("error opening disk storage: %w", err) } + return &Local{kv.New(disk)}, nil } return nil, fmt.Errorf("invalid storage backend %s", config.GetStorageBackend()) diff --git a/testrig/storage.go b/testrig/storage.go index 657c81161..e29c82532 100644 --- a/testrig/storage.go +++ b/testrig/storage.go @@ -28,6 +28,7 @@ import ( "codeberg.org/gruf/go-store/v2/storage" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/superseriousbusiness/gotosocial/internal/config" gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage" ) @@ -41,14 +42,35 @@ func NewInMemoryStorage() *gtsstorage.Local { } func NewS3Storage() gtsstorage.Driver { - mc, err := minio.New(os.Getenv("GTS_STORAGE_S3_ENDPOINT"), &minio.Options{ - Creds: credentials.NewStaticV4(os.Getenv("GTS_STORAGE_S3_ACCESS_KEY"), os.Getenv("GTS_STORAGE_S3_SECRET_KEY"), ""), - Secure: false, + endpoint := config.GetStorageS3Endpoint() + access := config.GetStorageS3AccessKey() + secret := config.GetStorageS3SecretKey() + secure := config.GetStorageS3UseSSL() + bucket := config.GetStorageS3BucketName() + proxy := config.GetStorageS3Proxy() + + s3, err := storage.OpenS3(endpoint, bucket, &storage.S3Config{ + CoreOpts: minio.Options{ + Creds: credentials.NewStaticV4(access, secret, ""), + Secure: secure, + }, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkSize: 5 * 1024 * 1024, // 2MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, }) if err != nil { - panic(err) + panic(fmt.Errorf("error opening s3 storage: %w", err)) + } + + return >sstorage.S3{ + Proxy: proxy, + Bucket: bucket, + Storage: s3, + KVStore: kv.New(s3), } - return gtsstorage.NewS3(mc, os.Getenv("GTS_STORAGE_S3_BUCKET"), false) } // StandardStorageSetup populates the storage with standard test entries from the given directory. diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go index baf2a1b3c..f8011114f 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go @@ -50,14 +50,17 @@ type S3Config struct { // getS3Config returns a valid S3Config for supplied ptr. func getS3Config(cfg *S3Config) S3Config { + const minChunkSz = 5 * 1024 * 1024 + // If nil, use default if cfg == nil { cfg = DefaultS3Config } - // Assume 0 chunk size == use default - if cfg.PutChunkSize <= 0 { - cfg.PutChunkSize = 4 * 1024 * 1024 + // Ensure a minimum compatible chunk size + if cfg.PutChunkSize <= minChunkSz { + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + cfg.PutChunkSize = minChunkSz } // Assume 0 list size == use default @@ -67,11 +70,13 @@ func getS3Config(cfg *S3Config) S3Config { // Return owned config copy return S3Config{ - CoreOpts: cfg.CoreOpts, - GetOpts: cfg.GetOpts, - PutOpts: cfg.PutOpts, - StatOpts: cfg.StatOpts, - RemoveOpts: cfg.RemoveOpts, + CoreOpts: cfg.CoreOpts, + GetOpts: cfg.GetOpts, + PutOpts: cfg.PutOpts, + PutChunkSize: cfg.PutChunkSize, + ListSize: cfg.ListSize, + StatOpts: cfg.StatOpts, + RemoveOpts: cfg.RemoveOpts, } } @@ -198,7 +203,7 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e } var ( - count int + count = 1 parts []minio.CompletePart chunk = make([]byte, st.config.PutChunkSize) rdr = bytes.NewReader(nil) @@ -243,7 +248,7 @@ loop: uploadID, count, rdr, - st.config.PutChunkSize, + int64(n), "", "", nil, diff --git a/vendor/modules.txt b/vendor/modules.txt index d6cdcf7f2..f2e306b0d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -58,7 +58,7 @@ codeberg.org/gruf/go-runners # codeberg.org/gruf/go-sched v1.1.1 ## explicit; go 1.19 codeberg.org/gruf/go-sched -# codeberg.org/gruf/go-store/v2 v2.0.7 +# codeberg.org/gruf/go-store/v2 v2.0.9 ## explicit; go 1.19 codeberg.org/gruf/go-store/v2/kv codeberg.org/gruf/go-store/v2/storage