gotosocial/vendor/codeberg.org/gruf/go-storage/s3/s3.go

471 lines
11 KiB
Go
Raw Normal View History

package s3
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"codeberg.org/gruf/go-storage"
"codeberg.org/gruf/go-storage/internal"
"github.com/minio/minio-go/v7"
)
// ensure S3Storage conforms to storage.Storage.
var _ storage.Storage = (*S3Storage)(nil)
// ensure bytes.Reader conforms to ReaderSize.
var _ ReaderSize = (*bytes.Reader)(nil)
// ReaderSize is an extension of the io.Reader interface
// that may be implemented by callers of WriteStream() in
// order to improve performance. When the size is known it
// is passed onto the underlying minio S3 library.
type ReaderSize interface {
io.Reader
Size() int64
}
// DefaultConfig returns the default S3Storage configuration.
func DefaultConfig() Config {
return defaultConfig
}
// immutable default configuration.
var defaultConfig = Config{
CoreOpts: minio.Options{},
PutChunkSize: 4 * 1024 * 1024, // 4MiB
ListSize: 200,
}
// Config defines options to be used when opening an S3Storage,
// mostly options for underlying S3 client library.
type Config struct {
// CoreOpts are S3 client options
// passed during initialization.
CoreOpts minio.Options
// PutChunkSize is the chunk size (in bytes)
// to use when sending a byte stream reader
// of unknown size as a multi-part object.
PutChunkSize int64
// ListSize determines how many items
// to include in each list request, made
// during calls to .WalkKeys().
ListSize int
}
// getS3Config returns valid (and owned!) Config for given ptr.
func getS3Config(cfg *Config) Config {
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const minChunkSz = 5 * 1024 * 1024
if cfg == nil {
// use defaults.
return defaultConfig
}
// Ensure a minimum compat chunk size.
if cfg.PutChunkSize <= minChunkSz {
cfg.PutChunkSize = minChunkSz
}
// Ensure valid list size.
if cfg.ListSize <= 0 {
cfg.ListSize = 200
}
return Config{
CoreOpts: cfg.CoreOpts,
PutChunkSize: cfg.PutChunkSize,
ListSize: cfg.ListSize,
}
}
// S3Storage is a storage implementation that stores key-value
// pairs in an S3 instance at given endpoint with bucket name.
type S3Storage struct {
client *minio.Core
bucket string
config Config
}
// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) {
// Check + set config defaults.
config := getS3Config(cfg)
// Create new S3 client connection to given endpoint.
client, err := minio.NewCore(endpoint, &config.CoreOpts)
if err != nil {
return nil, err
}
ctx := context.Background()
// Check that provided bucket actually exists.
exists, err := client.BucketExists(ctx, bucket)
if err != nil {
return nil, err
} else if !exists {
return nil, errors.New("storage/s3: bucket does not exist")
}
return &S3Storage{
client: client,
bucket: bucket,
config: config,
}, nil
}
// Client: returns access to the underlying S3 client.
func (st *S3Storage) Client() *minio.Core {
return st.client
}
// Clean: implements Storage.Clean().
func (st *S3Storage) Clean(ctx context.Context) error {
return nil // nothing to do for S3
}
// ReadBytes: implements Storage.ReadBytes().
func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
// Read all data to memory.
data, err := io.ReadAll(rc)
if err != nil {
_ = rc.Close()
return nil, err
}
// Close storage stream reader.
if err := rc.Close(); err != nil {
return nil, err
}
return data, nil
}
// ReadStream: implements Storage.ReadStream().
func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
rc, _, _, err := st.GetObject(ctx, key, minio.GetObjectOptions{})
return rc, err
}
// GetObject wraps minio.Core{}.GetObject() to handle wrapping with our own storage library error types.
func (st *S3Storage) GetObject(ctx context.Context, key string, opts minio.GetObjectOptions) (io.ReadCloser, minio.ObjectInfo, http.Header, error) {
// Query bucket for object data and info.
rc, info, hdr, err := st.client.GetObject(
ctx,
st.bucket,
key,
opts,
)
if err != nil {
if isNotFoundError(err) {
// Wrap not found errors as our not found type.
err = internal.WrapErr(err, storage.ErrNotFound)
} else if isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
}
return rc, info, hdr, err
}
// WriteBytes: implements Storage.WriteBytes().
func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
info, err := st.PutObject(ctx, key, bytes.NewReader(value), minio.PutObjectOptions{})
return int(info.Size), err
}
// WriteStream: implements Storage.WriteStream().
func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
info, err := st.PutObject(ctx, key, r, minio.PutObjectOptions{})
return info.Size, err
}
// PutObject wraps minio.Core{}.PutObject() to handle wrapping with our own storage library error types, and in the case of an io.Reader
// that does not implement ReaderSize{}, it will instead handle upload by using minio.Core{}.NewMultipartUpload() in chunks of PutChunkSize.
func (st *S3Storage) PutObject(ctx context.Context, key string, r io.Reader, opts minio.PutObjectOptions) (minio.UploadInfo, error) {
if rs, ok := r.(ReaderSize); ok {
// This reader supports providing us the size of
// the encompassed data, allowing us to perform
// a singular .PutObject() call with length.
info, err := st.client.PutObject(
ctx,
st.bucket,
key,
r,
rs.Size(),
"",
"",
opts,
)
if err != nil {
if isConflictError(err) {
// Wrap conflict errors as our already exists type.
err = internal.WrapErr(err, storage.ErrAlreadyExists)
} else if isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
}
return info, err
}
// Start a new multipart upload to get ID.
uploadID, err := st.client.NewMultipartUpload(
ctx,
st.bucket,
key,
opts,
)
if err != nil {
if isConflictError(err) {
// Wrap conflict errors as our already exists type.
err = internal.WrapErr(err, storage.ErrAlreadyExists)
} else if isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return minio.UploadInfo{}, err
}
var (
total = int64(0)
index = int(1) // parts index
parts []minio.CompletePart
chunk = make([]byte, st.config.PutChunkSize)
rbuf = bytes.NewReader(nil)
)
// Note that we do not perform any kind of
// memory pooling of the chunk buffers here.
// Optimal chunking sizes for S3 writes are in
// the orders of megabytes, so letting the GC
// collect these ASAP is much preferred.
loop:
for done := false; !done; {
// Read next chunk into byte buffer.
n, err := io.ReadFull(r, chunk)
switch err {
// Successful read.
case nil:
// Reached end, buffer empty.
case io.EOF:
break loop
// Reached end, but buffer not empty.
case io.ErrUnexpectedEOF:
done = true
// All other errors.
default:
return minio.UploadInfo{}, err
}
// Reset byte reader.
rbuf.Reset(chunk[:n])
// Put this object chunk in S3 store.
pt, err := st.client.PutObjectPart(
ctx,
st.bucket,
key,
uploadID,
index,
rbuf,
int64(n),
minio.PutObjectPartOptions{
SSE: opts.ServerSideEncryption,
DisableContentSha256: opts.DisableContentSha256,
},
)
if err != nil {
return minio.UploadInfo{}, err
}
// Append completed part to slice.
parts = append(parts, minio.CompletePart{
PartNumber: pt.PartNumber,
ETag: pt.ETag,
ChecksumCRC32: pt.ChecksumCRC32,
ChecksumCRC32C: pt.ChecksumCRC32C,
ChecksumSHA1: pt.ChecksumSHA1,
ChecksumSHA256: pt.ChecksumSHA256,
})
// Update total.
total += int64(n)
// Iterate.
index++
}
// Complete this multi-part upload operation
info, err := st.client.CompleteMultipartUpload(
ctx,
st.bucket,
key,
uploadID,
parts,
opts,
)
if err != nil {
return minio.UploadInfo{}, err
}
// Set correct size.
info.Size = total
return info, nil
}
// Stat: implements Storage.Stat().
func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, error) {
info, err := st.StatObject(ctx, key, minio.StatObjectOptions{})
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
err = nil // mask not-found errors
}
return nil, err
}
return &storage.Entry{
Key: key,
Size: info.Size,
}, nil
}
// StatObject wraps minio.Core{}.StatObject() to handle wrapping with our own storage library error types.
func (st *S3Storage) StatObject(ctx context.Context, key string, opts minio.StatObjectOptions) (minio.ObjectInfo, error) {
// Query bucket for object info.
info, err := st.client.StatObject(
ctx,
st.bucket,
key,
opts,
)
if err != nil {
if isNotFoundError(err) {
// Wrap not found errors as our not found type.
err = internal.WrapErr(err, storage.ErrNotFound)
} else if isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
}
return info, err
}
// Remove: implements Storage.Remove().
func (st *S3Storage) Remove(ctx context.Context, key string) error {
_, err := st.StatObject(ctx, key, minio.StatObjectOptions{})
if err != nil {
return err
}
return st.RemoveObject(ctx, key, minio.RemoveObjectOptions{})
}
// RemoveObject wraps minio.Core{}.RemoveObject() to handle wrapping with our own storage library error types.
func (st *S3Storage) RemoveObject(ctx context.Context, key string, opts minio.RemoveObjectOptions) error {
// Remove object from S3 bucket
err := st.client.RemoveObject(
ctx,
st.bucket,
key,
opts,
)
if err != nil {
if isNotFoundError(err) {
// Wrap not found errors as our not found type.
err = internal.WrapErr(err, storage.ErrNotFound)
} else if isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
}
return err
}
// WalkKeys: implements Storage.WalkKeys().
func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error {
if opts.Step == nil {
panic("nil step fn")
}
var (
prev string
token string
)
for {
// List objects in bucket starting at marker.
result, err := st.client.ListObjectsV2(
st.bucket,
opts.Prefix,
prev,
token,
"",
st.config.ListSize,
)
if err != nil {
return err
}
// Iterate through list result contents.
for _, obj := range result.Contents {
// Skip filtered obj keys.
if opts.Filter != nil &&
opts.Filter(obj.Key) {
continue
}
// Pass each obj through step func.
if err := opts.Step(storage.Entry{
Key: obj.Key,
Size: obj.Size,
}); err != nil {
return err
}
}
// No token means we reached end of bucket.
if result.NextContinuationToken == "" {
return nil
}
// Set continue token and prev mark
token = result.NextContinuationToken
prev = result.StartAfter
}
}