diff --git a/api/api.go b/api/api.go index 2e183c9..42421db 100644 --- a/api/api.go +++ b/api/api.go @@ -71,6 +71,11 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { return } + if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { + h.handleListMultipartParts(w, r, bucket, key, uploadID) + return + } + stream, manifest, err := h.svc.GetObject(bucket, key) if err != nil { writeMappedS3Error(w, r, err) @@ -94,6 +99,7 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } + defer r.Body.Close() if _, ok := r.URL.Query()["uploads"]; ok { upload, err := h.svc.CreateMultipartUpload(bucket, key) @@ -119,6 +125,39 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { return } + if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { + var req models.CompleteMultipartUploadRequest + if err := xml.NewDecoder(r.Body).Decode(&req); err != nil { + writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) + return + } + + manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.CompleteMultipartUploadResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucket, + Key: key, + ETag: `"` + manifest.ETag + `"`, + Location: r.URL.Path, + } + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) + return + } + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } @@ -129,9 +168,31 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - if r.URL.Query().Get("uploads") != "" { - if r.URL.Query().Get("partNumber") != "" { + defer r.Body.Close() + + uploadID := r.URL.Query().Get("uploadId") + partNumberRaw := r.URL.Query().Get("partNumber") + if uploadID != "" || partNumberRaw != "" { + if uploadID == "" || partNumberRaw == "" { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return } + + partNumber, err := strconv.Atoi(partNumberRaw) + if err != nil { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return + } + + etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, r.Body) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.Header().Set("ETag", `"`+etag+`"`) + w.Header().Set("Content-Length", "0") + w.WriteHeader(http.StatusOK) + return } contentType := r.Header.Get("Content-Type") @@ -140,7 +201,6 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { } manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body) - defer r.Body.Close() if err != nil { writeMappedS3Error(w, r, err) @@ -153,6 +213,41 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) { + parts, err := h.svc.ListMultipartParts(bucket, key, uploadID) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.ListPartsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucket, + Key: key, + UploadID: uploadID, + Parts: make([]models.PartItem, 0, len(parts)), + } + for _, part := range parts { + response.Parts = append(response.Parts, models.PartItem{ + PartNumber: part.PartNumber, + LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"), + ETag: `"` + part.ETag + `"`, + Size: part.Size, + }) + } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) +} + func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -198,7 +293,15 @@ func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - + if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" { + err := h.svc.AbortMultipartUpload(bucket, key, uploadId) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.WriteHeader(http.StatusNoContent) + return + } err := h.svc.DeleteObject(bucket, key) if err != nil { if errors.Is(err, metadata.ErrObjectNotFound) { diff --git a/api/s3_errors.go b/api/s3_errors.go index 8792621..7d65519 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -5,6 +5,7 @@ import ( "errors" "fs/metadata" "fs/models" + "fs/service" "net/http" ) @@ -25,6 +26,21 @@ var ( Code: "NotImplemented", Message: "A header you provided implies functionality that is not implemented.", } + s3ErrInvalidPart = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidPart", + Message: "One or more of the specified parts could not be found.", + } + s3ErrInvalidPartOrder = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidPartOrder", + Message: "The list of parts was not in ascending order.", + } + s3ErrMalformedXML = s3APIError{ + Status: http.StatusBadRequest, + Code: "MalformedXML", + Message: "The XML you provided was not well-formed or did not validate against our published schema.", + } s3ErrInternal = s3APIError{ Status: http.StatusInternalServerError, Code: "InternalError", @@ -64,6 +80,24 @@ func mapToS3Error(err error) s3APIError { Code: "NoSuchKey", Message: "The specified key does not exist.", } + case errors.Is(err, metadata.ErrMultipartNotFound): + return s3APIError{ + Status: http.StatusNotFound, + Code: "NoSuchUpload", + Message: "The specified multipart upload does not exist.", + } + case errors.Is(err, metadata.ErrMultipartNotPending): + return s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidRequest", + Message: "The multipart upload is not in a valid state for this operation.", + } + case errors.Is(err, service.ErrInvalidPart): + return s3ErrInvalidPart + case errors.Is(err, service.ErrInvalidPartOrder): + return s3ErrInvalidPartOrder + case errors.Is(err, service.ErrInvalidCompleteRequest): + return s3ErrMalformedXML default: return s3ErrInternal } diff --git a/metadata/metadata.go b/metadata/metadata.go index 1438a64..1365f13 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -6,6 +6,7 @@ import ( "fmt" "fs/models" "regexp" + "sort" "strings" "time" @@ -20,6 +21,7 @@ type MetadataHandler struct { var systemIndex = []byte("__SYSTEM_BUCKETS__") var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__") +var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__") var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`) @@ -29,6 +31,8 @@ var ( ErrBucketNotFound = errors.New("bucket not found") ErrBucketNotEmpty = errors.New("bucket not empty") ErrObjectNotFound = errors.New("object not found") + ErrMultipartNotFound = errors.New("multipart upload not found") + ErrMultipartNotPending = errors.New("multipart upload is not pending") ) func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { @@ -54,6 +58,14 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } + err = h.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex) + return err + }) + if err != nil { + _ = db.Close() + return nil, err + } return h, nil } @@ -325,3 +337,219 @@ func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.Mul return upload, nil } + +func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + multipartUploadBucket := tx.Bucket(multipartUploadIndex) + if multipartUploadBucket == nil { + return nil, errors.New("multipart upload index not found") + } + return multipartUploadBucket, nil +} + +func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex) + if multipartPartsBucket == nil { + return nil, errors.New("multipart upload parts index not found") + } + return multipartPartsBucket, nil +} + +func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) { + payload := multipartUploadBucket.Get([]byte(uploadID)) + if payload == nil { + return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID) + } + upload := models.MultipartUpload{} + if err := json.Unmarshal(payload, &upload); err != nil { + return nil, err + } + return &upload, nil +} + +func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) { + multipartUploadBucket, err := getMultipartUploadBucket(tx) + if err != nil { + return nil, nil, err + } + upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID) + if err != nil { + return nil, nil, err + } + return upload, multipartUploadBucket, nil +} + +func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error { + payload, err := json.Marshal(upload) + if err != nil { + return err + } + return multipartUploadBucket.Put([]byte(uploadID), payload) +} + +func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error { + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + + prefix := uploadID + ":" + cursor := multipartPartsBucket.Cursor() + keysToDelete := make([][]byte, 0) + for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() { + keyCopy := make([]byte, len(k)) + copy(keyCopy, k) + keysToDelete = append(keysToDelete, keyCopy) + } + for _, key := range keysToDelete { + if err := multipartPartsBucket.Delete(key); err != nil { + return err + } + } + return nil +} + +func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) { + var upload *models.MultipartUpload + err := h.db.View(func(tx *bbolt.Tx) error { + var err error + upload, _, err = getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + return upload, nil +} +func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error { + if part.PartNumber < 1 || part.PartNumber > 10000 { + return fmt.Errorf("invalid part number: %d", part.PartNumber) + } + + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, _, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State != "pending" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + + key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber) + payload, err := json.Marshal(part) + if err != nil { + return err + } + return multipartPartsBucket.Put([]byte(key), payload) + }) + if err != nil { + return err + } + return nil +} +func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) { + parts := make([]models.UploadedPart, 0) + + err := h.db.View(func(tx *bbolt.Tx) error { + if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil { + return err + } + + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + prefix := uploadID + ":" + cursor := multipartPartsBucket.Cursor() + for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() { + part := models.UploadedPart{} + if err := json.Unmarshal(v, &part); err != nil { + return err + } + parts = append(parts, part) + } + return nil + }) + if err != nil { + return nil, err + } + + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + return parts, nil +} +func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error { + if final == nil { + return errors.New("final object manifest is required") + } + + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State != "pending" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + + metadataBucket := tx.Bucket([]byte(upload.Bucket)) + if metadataBucket == nil { + return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket) + } + final.Bucket = upload.Bucket + final.Key = upload.Key + finalPayload, err := json.Marshal(final) + if err != nil { + return err + } + if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil { + return err + } + + upload.State = "completed" + if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil { + return err + } + + if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + return nil +} +func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error { + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State == "completed" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + upload.State = "aborted" + if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil { + return err + } + + if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + return nil +} diff --git a/models/models.go b/models/models.go index 6b11cc1..6cace47 100644 --- a/models/models.go +++ b/models/models.go @@ -74,3 +74,45 @@ type InitiateMultipartUploadResult struct { Key string `xml:"Key"` UploadID string `xml:"UploadId"` } +type UploadedPart struct { + PartNumber int `json:"part_number" xml:"PartNumber"` + ETag string `json:"etag" xml:"ETag"` + Size int64 `json:"size" xml:"Size"` + Chunks []string `json:"chunks"` + CreatedAt int64 `json:"created_at"` +} + +type CompletedPart struct { + PartNumber int `xml:"PartNumber"` + ETag string `xml:"ETag"` +} + +type CompleteMultipartUploadRequest struct { + XMLName xml.Name `xml:"CompleteMultipartUpload"` + Parts []CompletedPart `xml:"Part"` +} + +type CompleteMultipartUploadResult struct { + XMLName xml.Name `xml:"CompleteMultipartUploadResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + ETag string `xml:"ETag"` + Location string `xml:"Location,omitempty"` +} + +type ListPartsResult struct { + XMLName xml.Name `xml:"ListPartsResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` + Parts []PartItem `xml:"Part"` +} + +type PartItem struct { + PartNumber int `xml:"PartNumber"` + LastModified string `xml:"LastModified"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` +} diff --git a/service/service.go b/service/service.go index 2448823..9d67e5d 100644 --- a/service/service.go +++ b/service/service.go @@ -1,11 +1,15 @@ package service import ( + "crypto/md5" + "encoding/hex" + "errors" "fmt" "fs/metadata" "fs/models" "fs/storage" "io" + "strings" "time" ) @@ -13,6 +17,12 @@ type ObjectService struct { metadataHandler *metadata.MetadataHandler } +var ( + ErrInvalidPart = errors.New("invalid multipart part") + ErrInvalidPartOrder = errors.New("invalid multipart part order") + ErrInvalidCompleteRequest = errors.New("invalid complete multipart request") +) + func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { return &ObjectService{metadataHandler: metadataHandler} } @@ -102,6 +112,137 @@ func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.Multi return s.metadataHandler.CreateMultipartUpload(bucket, key) } -func (s *ObjectService) PutMultipartObject(bucket, key, uploadId string, input io.Reader) (*models.MultipartUpload, error) { - return nil, nil +func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { + if partNumber < 1 || partNumber > 10000 { + return "", ErrInvalidPart + } + + upload, err := s.metadataHandler.GetMultipartUpload(uploadId) + if err != nil { + return "", err + } + if upload.Bucket != bucket || upload.Key != key { + return "", metadata.ErrMultipartNotFound + } + + var uploadedPart models.UploadedPart + chunkIds, totalSize, etag, err := storage.IngestStream(input) + if err != nil { + return "", err + } + uploadedPart = models.UploadedPart{ + PartNumber: partNumber, + ETag: etag, + Size: totalSize, + Chunks: chunkIds, + CreatedAt: time.Now().Unix(), + } + err = s.metadataHandler.PutMultipartPart(uploadId, uploadedPart) + if err != nil { + return "", err + } + return etag, nil +} + +func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { + upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + if err != nil { + return nil, err + } + if upload.Bucket != bucket || upload.Key != key { + return nil, metadata.ErrMultipartNotFound + } + return s.metadataHandler.ListMultipartParts(uploadID) +} + +func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { + if len(completed) == 0 { + return nil, ErrInvalidCompleteRequest + } + + upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + if err != nil { + return nil, err + } + if upload.Bucket != bucket || upload.Key != key { + return nil, metadata.ErrMultipartNotFound + } + + storedParts, err := s.metadataHandler.ListMultipartParts(uploadID) + if err != nil { + return nil, err + } + partsByNumber := make(map[int]models.UploadedPart, len(storedParts)) + for _, part := range storedParts { + partsByNumber[part.PartNumber] = part + } + + lastPartNumber := 0 + orderedParts := make([]models.UploadedPart, 0, len(completed)) + chunks := make([]string, 0) + var totalSize int64 + + for _, part := range completed { + if part.PartNumber <= lastPartNumber { + return nil, ErrInvalidPartOrder + } + lastPartNumber = part.PartNumber + + storedPart, ok := partsByNumber[part.PartNumber] + if !ok { + return nil, ErrInvalidPart + } + if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) { + return nil, ErrInvalidPart + } + + orderedParts = append(orderedParts, storedPart) + chunks = append(chunks, storedPart.Chunks...) + totalSize += storedPart.Size + } + + finalETag := buildMultipartETag(orderedParts) + manifest := &models.ObjectManifest{ + Bucket: bucket, + Key: key, + Size: totalSize, + ContentType: "application/octet-stream", + ETag: finalETag, + Chunks: chunks, + CreatedAt: time.Now().Unix(), + } + + if err := s.metadataHandler.CompleteMultipartUpload(uploadID, manifest); err != nil { + return nil, err + } + + return manifest, nil +} + +func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { + upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + if err != nil { + return err + } + if upload.Bucket != bucket || upload.Key != key { + return metadata.ErrMultipartNotFound + } + return s.metadataHandler.AbortMultipartUpload(uploadID) +} + +func normalizeETag(etag string) string { + return strings.Trim(etag, "\"") +} + +func buildMultipartETag(parts []models.UploadedPart) string { + hasher := md5.New() + for _, part := range parts { + etagBytes, err := hex.DecodeString(normalizeETag(part.ETag)) + if err == nil { + _, _ = hasher.Write(etagBytes) + continue + } + _, _ = hasher.Write([]byte(normalizeETag(part.ETag))) + } + return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts)) }