package api import ( "bufio" "context" "encoding/xml" "errors" "fmt" "fs/metadata" "fs/models" "fs/service" "fs/utils" "io" "log" "net/http" "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) type Handler struct { router *chi.Mux svc *service.ObjectService } func NewHandler(svc *service.ObjectService) *Handler { r := chi.NewRouter() r.Use(middleware.Recoverer) h := &Handler{ router: r, svc: svc, } return h } func (h *Handler) setupRoutes() { h.router.Use(middleware.Logger) h.router.Get("/", h.handleGetBuckets) h.router.Get("/{bucket}/", h.handleGetBucket) h.router.Get("/{bucket}", h.handleGetBucket) h.router.Put("/{bucket}", h.handlePutBucket) h.router.Put("/{bucket}/", h.handlePutBucket) h.router.Post("/{bucket}", h.handlePostBucket) h.router.Post("/{bucket}/", h.handlePostBucket) h.router.Delete("/{bucket}", h.handleDeleteBucket) h.router.Delete("/{bucket}/", h.handleDeleteBucket) h.router.Head("/{bucket}", h.handleHeadBucket) h.router.Head("/{bucket}/", h.handleHeadBucket) h.router.Get("/{bucket}/*", h.handleGetObject) h.router.Put("/{bucket}/*", h.handlePutObject) h.router.Post("/{bucket}/*", h.handlePostObject) h.router.Head("/{bucket}/*", h.handleHeadObject) h.router.Delete("/{bucket}/*", h.handleDeleteObject) } func (h *Handler) handleWelcome(w http.ResponseWriter) { w.WriteHeader(http.StatusOK) _, err := w.Write([]byte("Welcome to the Object Storage API!")) if err != nil { return } } func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") if key == "" { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { h.handleListMultipartParts(w, r, bucket, key, uploadID) return } stream, manifest, err := h.svc.GetObject(bucket, key) if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", manifest.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10)) w.Header().Set("ETag", `"`+manifest.ETag+`"`) w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat)) w.Header().Set("Accept-Ranges", "bytes") w.WriteHeader(http.StatusOK) _, err = io.Copy(w, stream) } func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") if key == "" { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { } }(r.Body) if _, ok := r.URL.Query()["uploads"]; ok { upload, err := h.svc.CreateMultipartUpload(bucket, key) if err != nil { writeMappedS3Error(w, r, err) return } response := models.InitiateMultipartUploadResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", Bucket: upload.Bucket, Key: upload.Key, UploadID: upload.UploadID, } payload, err := xml.MarshalIndent(response, "", " ") if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(xml.Header)) _, _ = w.Write(payload) return } if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { var req models.CompleteMultipartUploadRequest if err := xml.NewDecoder(r.Body).Decode(&req); err != nil { writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) return } manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts) if err != nil { writeMappedS3Error(w, r, err) return } response := models.CompleteMultipartUploadResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", Bucket: bucket, Key: key, ETag: `"` + manifest.ETag + `"`, Location: r.URL.Path, } payload, err := xml.MarshalIndent(response, "", " ") if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(xml.Header)) _, _ = w.Write(payload) return } writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") if key == "" { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { } }(r.Body) bodyReader := io.Reader(r.Body) if shouldDecodeAWSChunkedPayload(r) { bodyReader = newAWSChunkedDecodingReader(r.Body) } uploadID := r.URL.Query().Get("uploadId") partNumberRaw := r.URL.Query().Get("partNumber") if uploadID != "" || partNumberRaw != "" { if uploadID == "" || partNumberRaw == "" { writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) return } partNumber, err := strconv.Atoi(partNumberRaw) if err != nil { writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) return } etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader) if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("ETag", `"`+etag+`"`) w.Header().Set("Content-Length", "0") w.WriteHeader(http.StatusOK) return } contentType := r.Header.Get("Content-Type") if contentType == "" { contentType = "application/octet-stream" } manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader) if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("ETag", `"`+manifest.ETag+`"`) w.Header().Set("Content-Length", "0") w.WriteHeader(http.StatusOK) } func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) { parts, err := h.svc.ListMultipartParts(bucket, key, uploadID) if err != nil { writeMappedS3Error(w, r, err) return } response := models.ListPartsResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", Bucket: bucket, Key: key, UploadID: uploadID, Parts: make([]models.PartItem, 0, len(parts)), } for _, part := range parts { response.Parts = append(response.Parts, models.PartItem{ PartNumber: part.PartNumber, LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"), ETag: `"` + part.ETag + `"`, Size: part.Size, }) } payload, err := xml.MarshalIndent(response, "", " ") if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(xml.Header)) _, _ = w.Write(payload) } func shouldDecodeAWSChunkedPayload(r *http.Request) bool { contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding")) if strings.Contains(contentEncoding, "aws-chunked") { return true } signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256")) return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") } func newAWSChunkedDecodingReader(src io.Reader) io.Reader { pr, pw := io.Pipe() go func() { if err := decodeAWSChunkedPayload(src, pw); err != nil { _ = pw.CloseWithError(err) return } _ = pw.Close() }() return pr } func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error { reader := bufio.NewReader(src) for { headerLine, err := reader.ReadString('\n') if err != nil { return err } headerLine = strings.TrimRight(headerLine, "\r\n") chunkSizeToken := headerLine if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 { chunkSizeToken = chunkSizeToken[:idx] } chunkSizeToken = strings.TrimSpace(chunkSizeToken) chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64) if err != nil { return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err) } if chunkSize < 0 { return fmt.Errorf("invalid aws-chunked size: %d", chunkSize) } if chunkSize > 0 { if _, err := io.CopyN(dst, reader, chunkSize); err != nil { return err } } crlf := make([]byte, 2) if _, err := io.ReadFull(reader, crlf); err != nil { return err } if crlf[0] != '\r' || crlf[1] != '\n' { return errors.New("invalid aws-chunked payload terminator") } if chunkSize == 0 { for { line, err := reader.ReadString('\n') if err != nil { return err } if line == "\r\n" || line == "\n" { return nil } } } } } func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") if key == "" { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } manifest, err := h.svc.HeadObject(bucket, key) if err != nil { writeMappedS3Error(w, r, err) return } etag := manifest.ETag size := strconv.Itoa(int(manifest.Size)) w.Header().Set("ETag", `"`+etag+`"`) w.Header().Set("Content-Length", size) w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat)) w.WriteHeader(http.StatusOK) } func (h *Handler) handlePutBucket(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") if err := h.svc.CreateBucket(bucket); err != nil { writeMappedS3Error(w, r, err) return } w.WriteHeader(http.StatusCreated) } func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") if err := h.svc.DeleteBucket(bucket); err != nil { writeMappedS3Error(w, r, err) return } w.WriteHeader(http.StatusNoContent) } func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") if _, ok := r.URL.Query()["delete"]; !ok { writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) return } defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { } }(r.Body) bodyReader := io.Reader(r.Body) if shouldDecodeAWSChunkedPayload(r) { bodyReader = newAWSChunkedDecodingReader(r.Body) } var req models.DeleteObjectsRequest if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil { writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) return } keys := make([]string, 0, len(req.Objects)) for _, obj := range req.Objects { if obj.Key == "" { continue } keys = append(keys, obj.Key) } deleted, err := h.svc.DeleteObjects(bucket, keys) if err != nil { writeMappedS3Error(w, r, err) return } response := models.DeleteObjectsResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", } if !req.Quiet { response.Deleted = make([]models.DeletedEntry, 0, len(deleted)) for _, key := range deleted { response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key}) } } payload, err := xml.MarshalIndent(response, "", " ") if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(xml.Header)) _, _ = w.Write(payload) } func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") if key == "" { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" { err := h.svc.AbortMultipartUpload(bucket, key, uploadId) if err != nil { writeMappedS3Error(w, r, err) return } w.WriteHeader(http.StatusNoContent) return } err := h.svc.DeleteObject(bucket, key) if err != nil { if errors.Is(err, metadata.ErrObjectNotFound) { w.WriteHeader(http.StatusNoContent) return } writeMappedS3Error(w, r, err) return } w.WriteHeader(http.StatusNoContent) } func (h *Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") if err := h.svc.HeadBucket(bucket); err != nil { writeMappedS3Error(w, r, err) return } w.WriteHeader(http.StatusOK) } func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) { buckets, err := h.svc.ListBuckets() if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml") w.WriteHeader(http.StatusOK) for _, bucket := range buckets { _, err := w.Write([]byte(bucket)) if err != nil { return } } } func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") if r.URL.Query().Get("list-type") == "2" { prefix := r.URL.Query().Get("prefix") if prefix == "" { prefix = "" } h.handleListObjectsV2(w, r, bucket, prefix) return } if r.URL.Query().Has("location") { xmlResponse := ` us-east-1` w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) w.WriteHeader(http.StatusOK) _, err := w.Write([]byte(xmlResponse)) if err != nil { return } return } writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket, prefix string) { objects, err := h.svc.ListObjects(bucket, prefix) if err != nil { writeMappedS3Error(w, r, err) return } xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects) if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) w.WriteHeader(http.StatusOK) _, err = w.Write([]byte(xmlResponse)) if err != nil { return } } func (h *Handler) Start(address string) error { fmt.Printf("Starting API server on %s\n", address) h.setupRoutes() stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) server := http.Server{ Addr: address, Handler: h.router, } go func() { if err := server.ListenAndServe(); err != nil { log.Fatal(err) } }() <-stop ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { return err } if err := h.svc.Close(); err != nil { return err } return nil }