From 5d41ec9e0a161061f33a76dd2630324c1a598dbc Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sun, 22 Feb 2026 15:32:04 +0100 Subject: [PATCH] Implemented bulk delete from bucket, AWS SigV4 framing problems solved. --- api/api.go | 153 +++++++++++++++++++++++++++++++++++++++++-- api/s3_errors.go | 7 ++ main.go | 2 +- metadata/metadata.go | 28 ++++++++ models/models.go | 20 ++++++ service/service.go | 10 ++- 6 files changed, 214 insertions(+), 6 deletions(-) diff --git a/api/api.go b/api/api.go index 42421db..eff2c9d 100644 --- a/api/api.go +++ b/api/api.go @@ -1,6 +1,7 @@ package api import ( + "bufio" "encoding/xml" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "io" "net/http" "strconv" + "strings" "time" "github.com/go-chi/chi/v5" @@ -42,6 +44,8 @@ func (h *Handler) setupRoutes() { h.router.Get("/{bucket}", h.handleGetBucket) h.router.Put("/{bucket}", h.handlePutBucket) h.router.Put("/{bucket}/", h.handlePutBucket) + h.router.Post("/{bucket}", h.handlePostBucket) + h.router.Post("/{bucket}/", h.handlePostBucket) h.router.Delete("/{bucket}", h.handleDeleteBucket) h.router.Delete("/{bucket}/", h.handleDeleteBucket) h.router.Head("/{bucket}", h.handleHeadBucket) @@ -170,6 +174,11 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { } defer r.Body.Close() + bodyReader := io.Reader(r.Body) + if shouldDecodeAWSChunkedPayload(r) { + bodyReader = newAWSChunkedDecodingReader(r.Body) + } + uploadID := r.URL.Query().Get("uploadId") partNumberRaw := r.URL.Query().Get("partNumber") if uploadID != "" || partNumberRaw != "" { @@ -184,7 +193,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { return } - etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, r.Body) + etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader) if err != nil { writeMappedS3Error(w, r, err) return @@ -200,7 +209,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { contentType = "application/octet-stream" } - manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body) + manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader) if err != nil { writeMappedS3Error(w, r, err) @@ -248,6 +257,75 @@ func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Reques _, _ = w.Write(payload) } +func shouldDecodeAWSChunkedPayload(r *http.Request) bool { + contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding")) + if strings.Contains(contentEncoding, "aws-chunked") { + return true + } + signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256")) + return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") +} + +func newAWSChunkedDecodingReader(src io.Reader) io.Reader { + pr, pw := io.Pipe() + go func() { + if err := decodeAWSChunkedPayload(src, pw); err != nil { + _ = pw.CloseWithError(err) + return + } + _ = pw.Close() + }() + return pr +} + +func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error { + reader := bufio.NewReader(src) + for { + headerLine, err := reader.ReadString('\n') + if err != nil { + return err + } + headerLine = strings.TrimRight(headerLine, "\r\n") + chunkSizeToken := headerLine + if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 { + chunkSizeToken = chunkSizeToken[:idx] + } + chunkSizeToken = strings.TrimSpace(chunkSizeToken) + chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64) + if err != nil { + return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err) + } + if chunkSize < 0 { + return fmt.Errorf("invalid aws-chunked size: %d", chunkSize) + } + if chunkSize > 0 { + if _, err := io.CopyN(dst, reader, chunkSize); err != nil { + return err + } + } + + crlf := make([]byte, 2) + if _, err := io.ReadFull(reader, crlf); err != nil { + return err + } + if crlf[0] != '\r' || crlf[1] != '\n' { + return errors.New("invalid aws-chunked payload terminator") + } + + if chunkSize == 0 { + for { + line, err := reader.ReadString('\n') + if err != nil { + return err + } + if line == "\r\n" || line == "\n" { + return nil + } + } + } + } +} + func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -261,9 +339,11 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { writeMappedS3Error(w, r, err) return } + etag := manifest.ETag + size := strconv.Itoa(int(manifest.Size)) - w.Header().Set("ETag", `"`+manifest.ETag+`"`) - w.Header().Set("Content-Length", "0") + w.Header().Set("ETag", `"`+etag+`"`) + w.Header().Set("Content-Length", size) w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat)) w.WriteHeader(http.StatusOK) } @@ -286,6 +366,61 @@ func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { + bucket := chi.URLParam(r, "bucket") + if _, ok := r.URL.Query()["delete"]; !ok { + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) + return + } + defer r.Body.Close() + + bodyReader := io.Reader(r.Body) + if shouldDecodeAWSChunkedPayload(r) { + bodyReader = newAWSChunkedDecodingReader(r.Body) + } + + var req models.DeleteObjectsRequest + if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil { + writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) + return + } + + keys := make([]string, 0, len(req.Objects)) + for _, obj := range req.Objects { + if obj.Key == "" { + continue + } + keys = append(keys, obj.Key) + } + + deleted, err := h.svc.DeleteObjects(bucket, keys) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.DeleteObjectsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + } + if !req.Quiet { + response.Deleted = make([]models.DeletedEntry, 0, len(deleted)) + for _, key := range deleted { + response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key}) + } + } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) +} + func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -347,6 +482,16 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { h.handleListObjectsV2(w, r, bucket, prefix) return } + if r.URL.Query().Has("location") { + xmlResponse := ` + us-east-1` + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) + w.WriteHeader(http.StatusOK) + w.Write([]byte(xmlResponse)) + return + } writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } diff --git a/api/s3_errors.go b/api/s3_errors.go index 7d65519..c446e4d 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -41,6 +41,11 @@ var ( Code: "MalformedXML", Message: "The XML you provided was not well-formed or did not validate against our published schema.", } + s3ErrEntityTooSmall = s3APIError{ + Status: http.StatusBadRequest, + Code: "EntityTooSmall", + Message: "Your proposed upload is smaller than the minimum allowed object size.", + } s3ErrInternal = s3APIError{ Status: http.StatusInternalServerError, Code: "InternalError", @@ -98,6 +103,8 @@ func mapToS3Error(err error) s3APIError { return s3ErrInvalidPartOrder case errors.Is(err, service.ErrInvalidCompleteRequest): return s3ErrMalformedXML + case errors.Is(err, service.ErrEntityTooSmall): + return s3ErrEntityTooSmall default: return s3ErrInternal } diff --git a/main.go b/main.go index 74d37c7..39d3e65 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ func main() { objectService := service.NewObjectService(metadataHandler) handler := api.NewHandler(objectService) - err = handler.Start("localhost:3000") + err = handler.Start("0.0.0.0:3000") if err != nil { return } diff --git a/metadata/metadata.go b/metadata/metadata.go index 1365f13..ce346ac 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -289,6 +289,34 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error { } +func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) { + deleted := make([]string, 0, len(keys)) + + err := h.db.Update(func(tx *bbolt.Tx) error { + metadataBucket := tx.Bucket([]byte(bucket)) + if metadataBucket == nil { + return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket) + } + + for _, key := range keys { + if key == "" { + continue + } + if metadataBucket.Get([]byte(key)) != nil { + if err := metadataBucket.Delete([]byte(key)); err != nil { + return err + } + } + deleted = append(deleted, key) + } + return nil + }) + if err != nil { + return nil, err + } + return deleted, nil +} + func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { var upload *models.MultipartUpload diff --git a/models/models.go b/models/models.go index 6cace47..fe6d74f 100644 --- a/models/models.go +++ b/models/models.go @@ -116,3 +116,23 @@ type PartItem struct { ETag string `xml:"ETag"` Size int64 `xml:"Size"` } + +type DeleteObjectsRequest struct { + XMLName xml.Name `xml:"Delete"` + Objects []DeleteObjectIdentity `xml:"Object"` + Quiet bool `xml:"Quiet"` +} + +type DeleteObjectIdentity struct { + Key string `xml:"Key"` +} + +type DeleteObjectsResult struct { + XMLName xml.Name `xml:"DeleteResult"` + Xmlns string `xml:"xmlns,attr"` + Deleted []DeletedEntry `xml:"Deleted,omitempty"` +} + +type DeletedEntry struct { + Key string `xml:"Key"` +} diff --git a/service/service.go b/service/service.go index 9d67e5d..755aea1 100644 --- a/service/service.go +++ b/service/service.go @@ -21,6 +21,7 @@ var ( ErrInvalidPart = errors.New("invalid multipart part") ErrInvalidPartOrder = errors.New("invalid multipart part order") ErrInvalidCompleteRequest = errors.New("invalid complete multipart request") + ErrEntityTooSmall = errors.New("multipart entity too small") ) func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { @@ -108,6 +109,10 @@ func (s *ObjectService) ListBuckets() ([]string, error) { return s.metadataHandler.ListBuckets() } +func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { + return s.metadataHandler.DeleteManifests(bucket, keys) +} + func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { return s.metadataHandler.CreateMultipartUpload(bucket, key) } @@ -182,7 +187,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co chunks := make([]string, 0) var totalSize int64 - for _, part := range completed { + for i, part := range completed { if part.PartNumber <= lastPartNumber { return nil, ErrInvalidPartOrder } @@ -195,6 +200,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) { return nil, ErrInvalidPart } + if i < len(completed)-1 && storedPart.Size < 5*1024*1024 { + return nil, ErrEntityTooSmall + } orderedParts = append(orderedParts, storedPart) chunks = append(chunks, storedPart.Chunks...)