Implemented bulk delete from bucket, AWS SigV4 framing problems solved.

This commit is contained in:
2026-02-22 15:32:04 +01:00
parent 111ce5b669
commit 5d41ec9e0a
6 changed files with 214 additions and 6 deletions

View File

@@ -1,6 +1,7 @@
package api
import (
"bufio"
"encoding/xml"
"errors"
"fmt"
@@ -11,6 +12,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
@@ -42,6 +44,8 @@ func (h *Handler) setupRoutes() {
h.router.Get("/{bucket}", h.handleGetBucket)
h.router.Put("/{bucket}", h.handlePutBucket)
h.router.Put("/{bucket}/", h.handlePutBucket)
h.router.Post("/{bucket}", h.handlePostBucket)
h.router.Post("/{bucket}/", h.handlePostBucket)
h.router.Delete("/{bucket}", h.handleDeleteBucket)
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
h.router.Head("/{bucket}", h.handleHeadBucket)
@@ -170,6 +174,11 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
}
defer r.Body.Close()
bodyReader := io.Reader(r.Body)
if shouldDecodeAWSChunkedPayload(r) {
bodyReader = newAWSChunkedDecodingReader(r.Body)
}
uploadID := r.URL.Query().Get("uploadId")
partNumberRaw := r.URL.Query().Get("partNumber")
if uploadID != "" || partNumberRaw != "" {
@@ -184,7 +193,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
return
}
etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, r.Body)
etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
return
@@ -200,7 +209,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
contentType = "application/octet-stream"
}
manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body)
manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
@@ -248,6 +257,75 @@ func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Reques
_, _ = w.Write(payload)
}
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
if strings.Contains(contentEncoding, "aws-chunked") {
return true
}
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload")
}
func newAWSChunkedDecodingReader(src io.Reader) io.Reader {
pr, pw := io.Pipe()
go func() {
if err := decodeAWSChunkedPayload(src, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
}()
return pr
}
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
reader := bufio.NewReader(src)
for {
headerLine, err := reader.ReadString('\n')
if err != nil {
return err
}
headerLine = strings.TrimRight(headerLine, "\r\n")
chunkSizeToken := headerLine
if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 {
chunkSizeToken = chunkSizeToken[:idx]
}
chunkSizeToken = strings.TrimSpace(chunkSizeToken)
chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64)
if err != nil {
return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err)
}
if chunkSize < 0 {
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
}
if chunkSize > 0 {
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
return err
}
}
crlf := make([]byte, 2)
if _, err := io.ReadFull(reader, crlf); err != nil {
return err
}
if crlf[0] != '\r' || crlf[1] != '\n' {
return errors.New("invalid aws-chunked payload terminator")
}
if chunkSize == 0 {
for {
line, err := reader.ReadString('\n')
if err != nil {
return err
}
if line == "\r\n" || line == "\n" {
return nil
}
}
}
}
}
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
@@ -261,9 +339,11 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
writeMappedS3Error(w, r, err)
return
}
etag := manifest.ETag
size := strconv.Itoa(int(manifest.Size))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Content-Length", "0")
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", size)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.WriteHeader(http.StatusOK)
}
@@ -286,6 +366,61 @@ func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if _, ok := r.URL.Query()["delete"]; !ok {
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
return
}
defer r.Body.Close()
bodyReader := io.Reader(r.Body)
if shouldDecodeAWSChunkedPayload(r) {
bodyReader = newAWSChunkedDecodingReader(r.Body)
}
var req models.DeleteObjectsRequest
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
keys := make([]string, 0, len(req.Objects))
for _, obj := range req.Objects {
if obj.Key == "" {
continue
}
keys = append(keys, obj.Key)
}
deleted, err := h.svc.DeleteObjects(bucket, keys)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
if !req.Quiet {
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
for _, key := range deleted {
response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key})
}
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
@@ -347,6 +482,16 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
h.handleListObjectsV2(w, r, bucket, prefix)
return
}
if r.URL.Query().Has("location") {
xmlResponse := `<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">us-east-1</LocationConstraint>`
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
w.WriteHeader(http.StatusOK)
w.Write([]byte(xmlResponse))
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}

View File

@@ -41,6 +41,11 @@ var (
Code: "MalformedXML",
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
}
s3ErrEntityTooSmall = s3APIError{
Status: http.StatusBadRequest,
Code: "EntityTooSmall",
Message: "Your proposed upload is smaller than the minimum allowed object size.",
}
s3ErrInternal = s3APIError{
Status: http.StatusInternalServerError,
Code: "InternalError",
@@ -98,6 +103,8 @@ func mapToS3Error(err error) s3APIError {
return s3ErrInvalidPartOrder
case errors.Is(err, service.ErrInvalidCompleteRequest):
return s3ErrMalformedXML
case errors.Is(err, service.ErrEntityTooSmall):
return s3ErrEntityTooSmall
default:
return s3ErrInternal
}

View File

@@ -17,7 +17,7 @@ func main() {
objectService := service.NewObjectService(metadataHandler)
handler := api.NewHandler(objectService)
err = handler.Start("localhost:3000")
err = handler.Start("0.0.0.0:3000")
if err != nil {
return
}

View File

@@ -289,6 +289,34 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
}
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
deleted := make([]string, 0, len(keys))
err := h.db.Update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
for _, key := range keys {
if key == "" {
continue
}
if metadataBucket.Get([]byte(key)) != nil {
if err := metadataBucket.Delete([]byte(key)); err != nil {
return err
}
}
deleted = append(deleted, key)
}
return nil
})
if err != nil {
return nil, err
}
return deleted, nil
}
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload

View File

@@ -116,3 +116,23 @@ type PartItem struct {
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
}
type DeleteObjectsRequest struct {
XMLName xml.Name `xml:"Delete"`
Objects []DeleteObjectIdentity `xml:"Object"`
Quiet bool `xml:"Quiet"`
}
type DeleteObjectIdentity struct {
Key string `xml:"Key"`
}
type DeleteObjectsResult struct {
XMLName xml.Name `xml:"DeleteResult"`
Xmlns string `xml:"xmlns,attr"`
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
}
type DeletedEntry struct {
Key string `xml:"Key"`
}

View File

@@ -21,6 +21,7 @@ var (
ErrInvalidPart = errors.New("invalid multipart part")
ErrInvalidPartOrder = errors.New("invalid multipart part order")
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
ErrEntityTooSmall = errors.New("multipart entity too small")
)
func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService {
@@ -108,6 +109,10 @@ func (s *ObjectService) ListBuckets() ([]string, error) {
return s.metadataHandler.ListBuckets()
}
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
return s.metadataHandler.DeleteManifests(bucket, keys)
}
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
return s.metadataHandler.CreateMultipartUpload(bucket, key)
}
@@ -182,7 +187,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
chunks := make([]string, 0)
var totalSize int64
for _, part := range completed {
for i, part := range completed {
if part.PartNumber <= lastPartNumber {
return nil, ErrInvalidPartOrder
}
@@ -195,6 +200,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) {
return nil, ErrInvalidPart
}
if i < len(completed)-1 && storedPart.Size < 5*1024*1024 {
return nil, ErrEntityTooSmall
}
orderedParts = append(orderedParts, storedPart)
chunks = append(chunks, storedPart.Chunks...)