mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 01:56:25 +00:00
Working MultipartUpload that needs minor tweaks.
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"fs/models"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -20,6 +21,7 @@ type MetadataHandler struct {
|
||||
|
||||
var systemIndex = []byte("__SYSTEM_BUCKETS__")
|
||||
var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__")
|
||||
var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__")
|
||||
|
||||
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`)
|
||||
|
||||
@@ -29,6 +31,8 @@ var (
|
||||
ErrBucketNotFound = errors.New("bucket not found")
|
||||
ErrBucketNotEmpty = errors.New("bucket not empty")
|
||||
ErrObjectNotFound = errors.New("object not found")
|
||||
ErrMultipartNotFound = errors.New("multipart upload not found")
|
||||
ErrMultipartNotPending = errors.New("multipart upload is not pending")
|
||||
)
|
||||
|
||||
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||
@@ -54,6 +58,14 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
@@ -325,3 +337,219 @@ func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.Mul
|
||||
|
||||
return upload, nil
|
||||
}
|
||||
|
||||
func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
|
||||
multipartUploadBucket := tx.Bucket(multipartUploadIndex)
|
||||
if multipartUploadBucket == nil {
|
||||
return nil, errors.New("multipart upload index not found")
|
||||
}
|
||||
return multipartUploadBucket, nil
|
||||
}
|
||||
|
||||
func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
|
||||
multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex)
|
||||
if multipartPartsBucket == nil {
|
||||
return nil, errors.New("multipart upload parts index not found")
|
||||
}
|
||||
return multipartPartsBucket, nil
|
||||
}
|
||||
|
||||
func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) {
|
||||
payload := multipartUploadBucket.Get([]byte(uploadID))
|
||||
if payload == nil {
|
||||
return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID)
|
||||
}
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(payload, &upload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &upload, nil
|
||||
}
|
||||
|
||||
func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) {
|
||||
multipartUploadBucket, err := getMultipartUploadBucket(tx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return upload, multipartUploadBucket, nil
|
||||
}
|
||||
|
||||
func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error {
|
||||
payload, err := json.Marshal(upload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return multipartUploadBucket.Put([]byte(uploadID), payload)
|
||||
}
|
||||
|
||||
func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
|
||||
multipartPartsBucket, err := getMultipartPartsBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prefix := uploadID + ":"
|
||||
cursor := multipartPartsBucket.Cursor()
|
||||
keysToDelete := make([][]byte, 0)
|
||||
for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() {
|
||||
keyCopy := make([]byte, len(k))
|
||||
copy(keyCopy, k)
|
||||
keysToDelete = append(keysToDelete, keyCopy)
|
||||
}
|
||||
for _, key := range keysToDelete {
|
||||
if err := multipartPartsBucket.Delete(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
|
||||
var upload *models.MultipartUpload
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return upload, nil
|
||||
}
|
||||
func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error {
|
||||
if part.PartNumber < 1 || part.PartNumber > 10000 {
|
||||
return fmt.Errorf("invalid part number: %d", part.PartNumber)
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State != "pending" {
|
||||
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
|
||||
}
|
||||
|
||||
multipartPartsBucket, err := getMultipartPartsBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber)
|
||||
payload, err := json.Marshal(part)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return multipartPartsBucket.Put([]byte(key), payload)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
|
||||
parts := make([]models.UploadedPart, 0)
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
multipartPartsBucket, err := getMultipartPartsBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prefix := uploadID + ":"
|
||||
cursor := multipartPartsBucket.Cursor()
|
||||
for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() {
|
||||
part := models.UploadedPart{}
|
||||
if err := json.Unmarshal(v, &part); err != nil {
|
||||
return err
|
||||
}
|
||||
parts = append(parts, part)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Slice(parts, func(i, j int) bool {
|
||||
return parts[i].PartNumber < parts[j].PartNumber
|
||||
})
|
||||
return parts, nil
|
||||
}
|
||||
func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error {
|
||||
if final == nil {
|
||||
return errors.New("final object manifest is required")
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State != "pending" {
|
||||
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
|
||||
}
|
||||
|
||||
metadataBucket := tx.Bucket([]byte(upload.Bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket)
|
||||
}
|
||||
final.Bucket = upload.Bucket
|
||||
final.Key = upload.Key
|
||||
finalPayload, err := json.Marshal(final)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
upload.State = "completed"
|
||||
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State == "completed" {
|
||||
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
|
||||
}
|
||||
upload.State = "aborted"
|
||||
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user