mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-06-13 02:39:27 +00:00
a5c920a50b
Signed-off-by: kim <grufwub@gmail.com>
398 lines
8.6 KiB
Go
398 lines
8.6 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"sync/atomic"
|
|
|
|
"codeberg.org/gruf/go-store/v2/util"
|
|
"github.com/minio/minio-go/v7"
|
|
)
|
|
|
|
// DefaultS3Config is the default S3Storage configuration.
|
|
var DefaultS3Config = &S3Config{
|
|
CoreOpts: minio.Options{},
|
|
GetOpts: minio.GetObjectOptions{},
|
|
PutOpts: minio.PutObjectOptions{},
|
|
PutChunkOpts: minio.PutObjectPartOptions{},
|
|
PutChunkSize: 4 * 1024 * 1024, // 4MiB
|
|
StatOpts: minio.StatObjectOptions{},
|
|
RemoveOpts: minio.RemoveObjectOptions{},
|
|
ListSize: 200,
|
|
}
|
|
|
|
// S3Config defines options to be used when opening an S3Storage,
|
|
// mostly options for underlying S3 client library.
|
|
type S3Config struct {
|
|
// CoreOpts are S3 client options passed during initialization.
|
|
CoreOpts minio.Options
|
|
|
|
// GetOpts are S3 client options passed during .Read___() calls.
|
|
GetOpts minio.GetObjectOptions
|
|
|
|
// PutOpts are S3 client options passed during .Write___() calls.
|
|
PutOpts minio.PutObjectOptions
|
|
|
|
// PutChunkSize is the chunk size (in bytes) to use when sending
|
|
// a byte stream reader of unknown size as a multi-part object.
|
|
PutChunkSize int64
|
|
|
|
// PutChunkOpts are S3 client options passed during chunked .Write___() calls.
|
|
PutChunkOpts minio.PutObjectPartOptions
|
|
|
|
// StatOpts are S3 client options passed during .Stat() calls.
|
|
StatOpts minio.StatObjectOptions
|
|
|
|
// RemoveOpts are S3 client options passed during .Remove() calls.
|
|
RemoveOpts minio.RemoveObjectOptions
|
|
|
|
// ListSize determines how many items to include in each
|
|
// list request, made during calls to .WalkKeys().
|
|
ListSize int
|
|
}
|
|
|
|
// getS3Config returns a valid S3Config for supplied ptr.
|
|
func getS3Config(cfg *S3Config) S3Config {
|
|
const minChunkSz = 5 * 1024 * 1024
|
|
|
|
// If nil, use default
|
|
if cfg == nil {
|
|
cfg = DefaultS3Config
|
|
}
|
|
|
|
// Ensure a minimum compatible chunk size
|
|
if cfg.PutChunkSize <= minChunkSz {
|
|
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
|
|
cfg.PutChunkSize = minChunkSz
|
|
}
|
|
|
|
// Assume 0 list size == use default
|
|
if cfg.ListSize <= 0 {
|
|
cfg.ListSize = 200
|
|
}
|
|
|
|
// Return owned config copy
|
|
return S3Config{
|
|
CoreOpts: cfg.CoreOpts,
|
|
GetOpts: cfg.GetOpts,
|
|
PutOpts: cfg.PutOpts,
|
|
PutChunkSize: cfg.PutChunkSize,
|
|
ListSize: cfg.ListSize,
|
|
StatOpts: cfg.StatOpts,
|
|
RemoveOpts: cfg.RemoveOpts,
|
|
}
|
|
}
|
|
|
|
// S3Storage is a storage implementation that stores key-value
|
|
// pairs in an S3 instance at given endpoint with bucket name.
|
|
type S3Storage struct {
|
|
client *minio.Core
|
|
bucket string
|
|
config S3Config
|
|
state uint32
|
|
}
|
|
|
|
// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
|
|
func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) {
|
|
// Get checked config
|
|
config := getS3Config(cfg)
|
|
|
|
// Create new S3 client connection
|
|
client, err := minio.NewCore(endpoint, &config.CoreOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check that provided bucket actually exists
|
|
exists, err := client.BucketExists(context.Background(), bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !exists {
|
|
return nil, new_error("bucket does not exist")
|
|
}
|
|
|
|
return &S3Storage{
|
|
client: client,
|
|
bucket: bucket,
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// Client returns access to the underlying S3 client.
|
|
func (st *S3Storage) Client() *minio.Core {
|
|
return st.client
|
|
}
|
|
|
|
// Clean implements Storage.Clean().
|
|
func (st *S3Storage) Clean(ctx context.Context) error {
|
|
return nil // nothing to do for S3
|
|
}
|
|
|
|
// ReadBytes implements Storage.ReadBytes().
|
|
func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
|
|
// Fetch object reader from S3 bucket
|
|
rc, err := st.ReadStream(ctx, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rc.Close()
|
|
|
|
// Read all bytes and return
|
|
return io.ReadAll(rc)
|
|
}
|
|
|
|
// ReadStream implements Storage.ReadStream().
|
|
func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
// Check storage open
|
|
if st.closed() {
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
// Fetch object reader from S3 bucket
|
|
rc, _, _, err := st.client.GetObject(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
st.config.GetOpts,
|
|
)
|
|
if err != nil {
|
|
return nil, transformS3Error(err)
|
|
}
|
|
|
|
return rc, nil
|
|
}
|
|
|
|
// WriteBytes implements Storage.WriteBytes().
|
|
func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
|
|
n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value))
|
|
return int(n), err
|
|
}
|
|
|
|
// WriteStream implements Storage.WriteStream().
|
|
func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
|
|
// Check storage open
|
|
if st.closed() {
|
|
return 0, ErrClosed
|
|
}
|
|
|
|
if rs, ok := r.(util.ReaderSize); ok {
|
|
// This reader supports providing us the size of
|
|
// the encompassed data, allowing us to perform
|
|
// a singular .PutObject() call with length.
|
|
info, err := st.client.PutObject(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
r,
|
|
rs.Size(),
|
|
"",
|
|
"",
|
|
st.config.PutOpts,
|
|
)
|
|
if err != nil {
|
|
err = transformS3Error(err)
|
|
}
|
|
return info.Size, err
|
|
}
|
|
|
|
// Start a new multipart upload to get ID
|
|
uploadID, err := st.client.NewMultipartUpload(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
st.config.PutOpts,
|
|
)
|
|
if err != nil {
|
|
return 0, transformS3Error(err)
|
|
}
|
|
|
|
var (
|
|
index = int(1) // parts index
|
|
total = int64(0)
|
|
parts []minio.CompletePart
|
|
chunk = make([]byte, st.config.PutChunkSize)
|
|
rbuf = bytes.NewReader(nil)
|
|
)
|
|
|
|
// Note that we do not perform any kind of
|
|
// memory pooling of the chunk buffers here.
|
|
// Optimal chunking sizes for S3 writes are in
|
|
// the orders of megabytes, so letting the GC
|
|
// collect these ASAP is much preferred.
|
|
|
|
loop:
|
|
for done := false; !done; {
|
|
// Read next chunk into byte buffer
|
|
n, err := io.ReadFull(r, chunk)
|
|
|
|
switch err {
|
|
// Successful read
|
|
case nil:
|
|
|
|
// Reached end, buffer empty
|
|
case io.EOF:
|
|
break loop
|
|
|
|
// Reached end, but buffer not empty
|
|
case io.ErrUnexpectedEOF:
|
|
done = true
|
|
|
|
// All other errors
|
|
default:
|
|
return 0, err
|
|
}
|
|
|
|
// Reset byte reader
|
|
rbuf.Reset(chunk[:n])
|
|
|
|
// Put this object chunk in S3 store
|
|
pt, err := st.client.PutObjectPart(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
uploadID,
|
|
index,
|
|
rbuf,
|
|
int64(n),
|
|
st.config.PutChunkOpts,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Append completed part to slice
|
|
parts = append(parts, minio.CompletePart{
|
|
PartNumber: pt.PartNumber,
|
|
ETag: pt.ETag,
|
|
ChecksumCRC32: pt.ChecksumCRC32,
|
|
ChecksumCRC32C: pt.ChecksumCRC32C,
|
|
ChecksumSHA1: pt.ChecksumSHA1,
|
|
ChecksumSHA256: pt.ChecksumSHA256,
|
|
})
|
|
|
|
// Iterate idx
|
|
index++
|
|
|
|
// Update total size
|
|
total += pt.Size
|
|
}
|
|
|
|
// Complete this multi-part upload operation
|
|
_, err = st.client.CompleteMultipartUpload(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
uploadID,
|
|
parts,
|
|
st.config.PutOpts,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return total, nil
|
|
}
|
|
|
|
// Stat implements Storage.Stat().
|
|
func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) {
|
|
// Check storage open
|
|
if st.closed() {
|
|
return false, ErrClosed
|
|
}
|
|
|
|
// Query object in S3 bucket
|
|
_, err := st.client.StatObject(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
st.config.StatOpts,
|
|
)
|
|
if err != nil {
|
|
return false, transformS3Error(err)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// Remove implements Storage.Remove().
|
|
func (st *S3Storage) Remove(ctx context.Context, key string) error {
|
|
// Check storage open
|
|
if st.closed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
// S3 returns no error on remove for non-existent keys
|
|
if ok, err := st.Stat(ctx, key); err != nil {
|
|
return err
|
|
} else if !ok {
|
|
return ErrNotFound
|
|
}
|
|
|
|
// Remove object from S3 bucket
|
|
err := st.client.RemoveObject(
|
|
ctx,
|
|
st.bucket,
|
|
key,
|
|
st.config.RemoveOpts,
|
|
)
|
|
if err != nil {
|
|
return transformS3Error(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// WalkKeys implements Storage.WalkKeys().
|
|
func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
|
|
var (
|
|
prev string
|
|
token string
|
|
)
|
|
|
|
for {
|
|
// List the objects in bucket starting at marker
|
|
result, err := st.client.ListObjectsV2(
|
|
st.bucket,
|
|
"",
|
|
prev,
|
|
token,
|
|
"",
|
|
st.config.ListSize,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Pass each object through walk func
|
|
for _, obj := range result.Contents {
|
|
if err := opts.WalkFn(ctx, Entry{
|
|
Key: obj.Key,
|
|
Size: obj.Size,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// No token means we reached end of bucket
|
|
if result.NextContinuationToken == "" {
|
|
return nil
|
|
}
|
|
|
|
// Set continue token and prev mark
|
|
token = result.NextContinuationToken
|
|
prev = result.StartAfter
|
|
}
|
|
}
|
|
|
|
// Close implements Storage.Close().
|
|
func (st *S3Storage) Close() error {
|
|
atomic.StoreUint32(&st.state, 1)
|
|
return nil
|
|
}
|
|
|
|
// closed returns whether S3Storage is closed.
|
|
func (st *S3Storage) closed() bool {
|
|
return (atomic.LoadUint32(&st.state) == 1)
|
|
}
|