mirror of
https://github.com/ferdzo/fs.git
synced 2026-06-04 04:26:46 +00:00
Add upload limits and multipart cleanup
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -21,6 +21,7 @@ type ObjectService struct {
|
||||
metadata *metadata.MetadataHandler
|
||||
blob *storage.BlobStore
|
||||
multipartRetention time.Duration
|
||||
maxUploadSize int64
|
||||
gcMu sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -29,16 +30,24 @@ var (
|
||||
ErrInvalidPartOrder = errors.New("invalid multipart part order")
|
||||
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
|
||||
ErrEntityTooSmall = errors.New("multipart entity too small")
|
||||
ErrEntityTooLarge = errors.New("entity too large")
|
||||
)
|
||||
|
||||
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration) *ObjectService {
|
||||
const DefaultMaxUploadSize int64 = 5 * 1024 * 1024 * 1024
|
||||
|
||||
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration, maxUploadSize ...int64) *ObjectService {
|
||||
if multipartRetention <= 0 {
|
||||
multipartRetention = 24 * time.Hour
|
||||
}
|
||||
limit := DefaultMaxUploadSize
|
||||
if len(maxUploadSize) > 0 {
|
||||
limit = maxUploadSize[0]
|
||||
}
|
||||
return &ObjectService{
|
||||
metadata: metadataHandler,
|
||||
blob: blobHandler,
|
||||
multipartRetention: multipartRetention,
|
||||
maxUploadSize: limit,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,7 +83,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
chunks, size, etag, err := s.blob.IngestStream(input)
|
||||
chunks, size, etag, err := s.blob.IngestStream(s.limitUpload(input))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -158,7 +167,9 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
||||
defer func() {
|
||||
metrics.Default.ObserveService("get_object", time.Since(start), streamOK)
|
||||
}()
|
||||
defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
||||
defer func() {
|
||||
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
||||
}()
|
||||
defer s.gcMu.RUnlock()
|
||||
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
@@ -311,7 +322,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
||||
}
|
||||
|
||||
var uploadedPart models.UploadedPart
|
||||
chunkIds, totalSize, etag, err := s.blob.IngestStream(input)
|
||||
chunkIds, totalSize, etag, err := s.blob.IngestStream(s.limitUpload(input))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -400,6 +411,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
|
||||
orderedParts = append(orderedParts, storedPart)
|
||||
chunks = append(chunks, storedPart.Chunks...)
|
||||
totalSize += storedPart.Size
|
||||
if s.maxUploadSize > 0 && totalSize > s.maxUploadSize {
|
||||
return nil, ErrEntityTooLarge
|
||||
}
|
||||
}
|
||||
|
||||
finalETag := buildMultipartETag(orderedParts)
|
||||
@@ -435,6 +449,40 @@ func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error
|
||||
return s.metadata.AbortMultipartUpload(uploadID)
|
||||
}
|
||||
|
||||
func (s *ObjectService) limitUpload(input io.Reader) io.Reader {
|
||||
if s.maxUploadSize <= 0 || input == nil {
|
||||
return input
|
||||
}
|
||||
return &maxBytesReader{inner: input, remaining: s.maxUploadSize}
|
||||
}
|
||||
|
||||
type maxBytesReader struct {
|
||||
inner io.Reader
|
||||
remaining int64
|
||||
tooLarge bool
|
||||
}
|
||||
|
||||
func (r *maxBytesReader) Read(p []byte) (int, error) {
|
||||
if r.tooLarge {
|
||||
return 0, ErrEntityTooLarge
|
||||
}
|
||||
if r.remaining <= 0 {
|
||||
var probe [1]byte
|
||||
n, err := r.inner.Read(probe[:])
|
||||
if n > 0 {
|
||||
r.tooLarge = true
|
||||
return 0, ErrEntityTooLarge
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
if int64(len(p)) > r.remaining {
|
||||
p = p[:r.remaining]
|
||||
}
|
||||
n, err := r.inner.Read(p)
|
||||
r.remaining -= int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func normalizeETag(etag string) string {
|
||||
return strings.Trim(etag, "\"")
|
||||
}
|
||||
@@ -469,6 +517,12 @@ func (s *ObjectService) GarbageCollect() error {
|
||||
unlock := s.acquireGCLock()
|
||||
defer unlock()
|
||||
|
||||
var err error
|
||||
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -492,11 +546,6 @@ func (s *ObjectService) GarbageCollect() error {
|
||||
return err
|
||||
}
|
||||
|
||||
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Info("garbage_collect_completed",
|
||||
"referenced_chunks", len(referencedChunkSet),
|
||||
"total_chunks", totalChunks,
|
||||
|
||||
119
service/upload_limit_test.go
Normal file
119
service/upload_limit_test.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fs/metadata"
|
||||
"fs/storage"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPutObjectRejectsOversizedUpload(t *testing.T) {
|
||||
svc := newTestObjectService(t, 4)
|
||||
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||
t.Fatalf("CreateBucket: %v", err)
|
||||
}
|
||||
|
||||
_, err := svc.PutObject("test-bucket", "too-large.txt", "text/plain", strings.NewReader("12345"))
|
||||
if !errors.Is(err, ErrEntityTooLarge) {
|
||||
t.Fatalf("PutObject error = %v, want ErrEntityTooLarge", err)
|
||||
}
|
||||
if _, err := svc.HeadObject("test-bucket", "too-large.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
|
||||
t.Fatalf("HeadObject error = %v, want ErrObjectNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPutObjectAllowsExactUploadLimit(t *testing.T) {
|
||||
svc := newTestObjectService(t, 4)
|
||||
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||
t.Fatalf("CreateBucket: %v", err)
|
||||
}
|
||||
|
||||
manifest, err := svc.PutObject("test-bucket", "exact.txt", "text/plain", strings.NewReader("1234"))
|
||||
if err != nil {
|
||||
t.Fatalf("PutObject: %v", err)
|
||||
}
|
||||
if manifest.Size != 4 {
|
||||
t.Fatalf("manifest size = %d, want 4", manifest.Size)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadPartRejectsOversizedUpload(t *testing.T) {
|
||||
svc := newTestObjectService(t, 4)
|
||||
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||
t.Fatalf("CreateBucket: %v", err)
|
||||
}
|
||||
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||
}
|
||||
|
||||
_, err = svc.UploadPart("test-bucket", "object.txt", upload.UploadID, 1, strings.NewReader("12345"))
|
||||
if !errors.Is(err, ErrEntityTooLarge) {
|
||||
t.Fatalf("UploadPart error = %v, want ErrEntityTooLarge", err)
|
||||
}
|
||||
parts, err := svc.ListMultipartParts("test-bucket", "object.txt", upload.UploadID)
|
||||
if err != nil {
|
||||
t.Fatalf("ListMultipartParts: %v", err)
|
||||
}
|
||||
if len(parts) != 0 {
|
||||
t.Fatalf("stored parts = %d, want 0", len(parts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGarbageCollectRemovesExpiredPendingMultipartChunks(t *testing.T) {
|
||||
svc := newTestObjectService(t, 1024)
|
||||
svc.multipartRetention = time.Nanosecond
|
||||
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||
t.Fatalf("CreateBucket: %v", err)
|
||||
}
|
||||
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||
}
|
||||
if _, err := svc.UploadPart("test-bucket", "object.txt", upload.UploadID, 1, strings.NewReader("part-data")); err != nil {
|
||||
t.Fatalf("UploadPart: %v", err)
|
||||
}
|
||||
chunks, err := svc.blob.ListChunks()
|
||||
if err != nil {
|
||||
t.Fatalf("ListChunks before GC: %v", err)
|
||||
}
|
||||
if len(chunks) == 0 {
|
||||
t.Fatalf("expected uploaded part chunks")
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
if err := svc.GarbageCollect(); err != nil {
|
||||
t.Fatalf("GarbageCollect: %v", err)
|
||||
}
|
||||
if _, err := svc.metadata.GetMultipartUpload(upload.UploadID); !errors.Is(err, metadata.ErrMultipartNotFound) {
|
||||
t.Fatalf("GetMultipartUpload error = %v, want ErrMultipartNotFound", err)
|
||||
}
|
||||
chunks, err = svc.blob.ListChunks()
|
||||
if err != nil {
|
||||
t.Fatalf("ListChunks after GC: %v", err)
|
||||
}
|
||||
if len(chunks) != 0 {
|
||||
t.Fatalf("chunks after GC = %d, want 0", len(chunks))
|
||||
}
|
||||
}
|
||||
|
||||
func newTestObjectService(t *testing.T, maxUploadSize int64) *ObjectService {
|
||||
t.Helper()
|
||||
root := t.TempDir()
|
||||
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("NewMetadataHandler: %v", err)
|
||||
}
|
||||
blob, err := storage.NewBlobStore(root, 4)
|
||||
if err != nil {
|
||||
t.Fatalf("NewBlobStore: %v", err)
|
||||
}
|
||||
svc := NewObjectService(md, blob, time.Hour, maxUploadSize)
|
||||
t.Cleanup(func() {
|
||||
_ = svc.Close()
|
||||
})
|
||||
return svc
|
||||
}
|
||||
Reference in New Issue
Block a user