diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..08dc479 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +*.md +.gocache/ +blobs/ +data/ \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e6c7567 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +LOG_LEVEL=debug +LOG_FORMAT=text +DATA_PATH=data/ +PORT=2600 +AUDIT_LOG=true +ADDRESS=0.0.0.0 diff --git a/.gitignore b/.gitignore index 7a250ef..445f71b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ .env +*.db .vscode/ blobs/ -*.db +data/ .idea/ +.gocache/ +.gomodcache/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..843bf24 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM golang:1.25-alpine AS build + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build -o /app/fs . + +FROM scratch AS runner + +COPY --from=build /app/fs /app/fs + +WORKDIR /app +CMD ["/app/fs"] diff --git a/README.md b/README.md index cca2a29..6512bc1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,34 @@ # fs -An experimental Object Storage written in Go that should be compatible with S3 \ No newline at end of file +An experimental Object Storage written in Go that should be partially compatible with S3 + +## Features + +- Bucket operations: +- `PUT /{bucket}` +- `HEAD /{bucket}` +- `DELETE /{bucket}` +- `GET /` (list buckets) +- Object operations: +- `PUT /{bucket}/{key}` +- `GET /{bucket}/{key}` +- `HEAD /{bucket}/{key}` +- `DELETE /{bucket}/{key}` +- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style) +- Multipart upload: +- `POST /{bucket}/{key}?uploads` (initiate) +- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part) +- `GET /{bucket}/{key}?uploadId=...` (list parts) +- `POST /{bucket}/{key}?uploadId=...` (complete) +- `DELETE /{bucket}/{key}?uploadId=...` (abort) +- Multi-object delete: +- `POST /{bucket}?delete` with S3-style XML body +- AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies) + +## Limitations + +- No authentication/authorization yet. +- Not full S3 API coverage. +- No garbage collection of unreferenced blob chunks yet. +- No versioning or lifecycle policies. +- Error and edge-case behavior is still being refined for client compatibility. \ No newline at end of file diff --git a/api/api.go b/api/api.go index ab3b772..ee70fc7 100644 --- a/api/api.go +++ b/api/api.go @@ -1,14 +1,24 @@ package api import ( + "bufio" + "context" + "encoding/xml" "errors" "fmt" + "fs/logging" "fs/metadata" + "fs/models" "fs/service" "fs/utils" "io" + "log/slog" "net/http" + "os" + "os/signal" "strconv" + "strings" + "syscall" "time" "github.com/go-chi/chi/v5" @@ -16,23 +26,30 @@ import ( ) type Handler struct { - router *chi.Mux - svc *service.ObjectService + router *chi.Mux + svc *service.ObjectService + logger *slog.Logger + logConfig logging.Config } -func NewHandler(svc *service.ObjectService) *Handler { +func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig logging.Config) *Handler { r := chi.NewRouter() r.Use(middleware.Recoverer) + if logger == nil { + logger = slog.Default() + } h := &Handler{ - router: r, - svc: svc, + router: r, + svc: svc, + logger: logger, + logConfig: logConfig, } return h } func (h *Handler) setupRoutes() { - h.router.Use(middleware.Logger) + h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig)) h.router.Get("/", h.handleGetBuckets) @@ -40,6 +57,8 @@ func (h *Handler) setupRoutes() { h.router.Get("/{bucket}", h.handleGetBucket) h.router.Put("/{bucket}", h.handlePutBucket) h.router.Put("/{bucket}/", h.handlePutBucket) + h.router.Post("/{bucket}", h.handlePostBucket) + h.router.Post("/{bucket}/", h.handlePostBucket) h.router.Delete("/{bucket}", h.handleDeleteBucket) h.router.Delete("/{bucket}/", h.handleDeleteBucket) h.router.Head("/{bucket}", h.handleHeadBucket) @@ -47,11 +66,12 @@ func (h *Handler) setupRoutes() { h.router.Get("/{bucket}/*", h.handleGetObject) h.router.Put("/{bucket}/*", h.handlePutObject) + h.router.Post("/{bucket}/*", h.handlePostObject) h.router.Head("/{bucket}/*", h.handleHeadObject) h.router.Delete("/{bucket}/*", h.handleDeleteObject) } -func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) { +func (h *Handler) handleWelcome(w http.ResponseWriter) { w.WriteHeader(http.StatusOK) _, err := w.Write([]byte("Welcome to the Object Storage API!")) if err != nil { @@ -68,8 +88,9 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { return } - if r.URL.Query().Get("uploadId") != "" { - + if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { + h.handleListMultipartParts(w, r, bucket, key, uploadID) + return } stream, manifest, err := h.svc.GetObject(bucket, key) @@ -77,6 +98,7 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { writeMappedS3Error(w, r, err) return } + defer stream.Close() w.Header().Set("Content-Type", manifest.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10)) @@ -88,6 +110,75 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { } +func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { + bucket := chi.URLParam(r, "bucket") + key := chi.URLParam(r, "*") + if key == "" { + writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) + return + } + defer r.Body.Close() + + if _, ok := r.URL.Query()["uploads"]; ok { + upload, err := h.svc.CreateMultipartUpload(bucket, key) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + response := models.InitiateMultipartUploadResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: upload.Bucket, + Key: upload.Key, + UploadID: upload.UploadID, + } + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) + return + } + + if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { + var req models.CompleteMultipartUploadRequest + if err := xml.NewDecoder(r.Body).Decode(&req); err != nil { + writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) + return + } + + manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.CompleteMultipartUploadResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucket, + Key: key, + ETag: `"` + manifest.ETag + `"`, + Location: r.URL.Path, + } + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) + return + } + + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) +} + func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -95,14 +186,59 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } + defer r.Body.Close() + + uploadID := r.URL.Query().Get("uploadId") + partNumberRaw := r.URL.Query().Get("partNumber") + if uploadID != "" || partNumberRaw != "" { + if uploadID == "" || partNumberRaw == "" { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return + } + + partNumber, err := strconv.Atoi(partNumberRaw) + if err != nil { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return + } + if partNumber < 1 || partNumber > 10000 { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return + } + + bodyReader := io.Reader(r.Body) + var decodeStream io.ReadCloser + if shouldDecodeAWSChunkedPayload(r) { + decodeStream = newAWSChunkedDecodingReader(r.Body) + defer decodeStream.Close() + bodyReader = decodeStream + } + + etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.Header().Set("ETag", `"`+etag+`"`) + w.Header().Set("Content-Length", "0") + w.WriteHeader(http.StatusOK) + return + } contentType := r.Header.Get("Content-Type") if contentType == "" { contentType = "application/octet-stream" } - manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body) - defer r.Body.Close() + bodyReader := io.Reader(r.Body) + var decodeStream io.ReadCloser + if shouldDecodeAWSChunkedPayload(r) { + decodeStream = newAWSChunkedDecodingReader(r.Body) + defer decodeStream.Close() + bodyReader = decodeStream + } + + manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader) if err != nil { writeMappedS3Error(w, r, err) @@ -115,6 +251,110 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) { + parts, err := h.svc.ListMultipartParts(bucket, key, uploadID) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.ListPartsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucket, + Key: key, + UploadID: uploadID, + Parts: make([]models.PartItem, 0, len(parts)), + } + for _, part := range parts { + response.Parts = append(response.Parts, models.PartItem{ + PartNumber: part.PartNumber, + LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"), + ETag: `"` + part.ETag + `"`, + Size: part.Size, + }) + } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) +} + +func shouldDecodeAWSChunkedPayload(r *http.Request) bool { + contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding")) + if strings.Contains(contentEncoding, "aws-chunked") { + return true + } + signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256")) + return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") +} + +func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + if err := decodeAWSChunkedPayload(src, pw); err != nil { + _ = pw.CloseWithError(err) + return + } + _ = pw.Close() + }() + return pr +} + +func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error { + reader := bufio.NewReader(src) + for { + headerLine, err := reader.ReadString('\n') + if err != nil { + return err + } + headerLine = strings.TrimRight(headerLine, "\r\n") + chunkSizeToken := headerLine + if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 { + chunkSizeToken = chunkSizeToken[:idx] + } + chunkSizeToken = strings.TrimSpace(chunkSizeToken) + chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64) + if err != nil { + return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err) + } + if chunkSize < 0 { + return fmt.Errorf("invalid aws-chunked size: %d", chunkSize) + } + if chunkSize > 0 { + if _, err := io.CopyN(dst, reader, chunkSize); err != nil { + return err + } + } + + crlf := make([]byte, 2) + if _, err := io.ReadFull(reader, crlf); err != nil { + return err + } + if crlf[0] != '\r' || crlf[1] != '\n' { + return errors.New("invalid aws-chunked payload terminator") + } + + if chunkSize == 0 { + for { + line, err := reader.ReadString('\n') + if err != nil { + return err + } + if line == "\r\n" || line == "\n" { + return nil + } + } + } + } +} + func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -128,9 +368,11 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { writeMappedS3Error(w, r, err) return } + etag := manifest.ETag + size := strconv.FormatInt(manifest.Size, 10) - w.Header().Set("ETag", `"`+manifest.ETag+`"`) - w.Header().Set("Content-Length", "0") + w.Header().Set("ETag", `"`+etag+`"`) + w.Header().Set("Content-Length", size) w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat)) w.WriteHeader(http.StatusOK) } @@ -153,6 +395,64 @@ func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { + bucket := chi.URLParam(r, "bucket") + if _, ok := r.URL.Query()["delete"]; !ok { + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) + return + } + defer r.Body.Close() + + bodyReader := io.Reader(r.Body) + var decodeStream io.ReadCloser + if shouldDecodeAWSChunkedPayload(r) { + decodeStream = newAWSChunkedDecodingReader(r.Body) + defer decodeStream.Close() + bodyReader = decodeStream + } + + var req models.DeleteObjectsRequest + if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil { + writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) + return + } + + keys := make([]string, 0, len(req.Objects)) + for _, obj := range req.Objects { + if obj.Key == "" { + continue + } + keys = append(keys, obj.Key) + } + + deleted, err := h.svc.DeleteObjects(bucket, keys) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.DeleteObjectsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + } + if !req.Quiet { + response.Deleted = make([]models.DeletedEntry, 0, len(deleted)) + for _, key := range deleted { + response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key}) + } + } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) +} + func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -160,7 +460,15 @@ func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - + if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" { + err := h.svc.AbortMultipartUpload(bucket, key, uploadId) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.WriteHeader(http.StatusNoContent) + return + } err := h.svc.DeleteObject(bucket, key) if err != nil { if errors.Is(err, metadata.ErrObjectNotFound) { @@ -191,7 +499,10 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/xml") w.WriteHeader(http.StatusOK) for _, bucket := range buckets { - w.Write([]byte(bucket)) + _, err := w.Write([]byte(bucket)) + if err != nil { + return + } } } @@ -206,6 +517,19 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { h.handleListObjectsV2(w, r, bucket, prefix) return } + if r.URL.Query().Has("location") { + xmlResponse := ` + us-east-1` + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(xmlResponse)) + if err != nil { + return + } + return + } writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } @@ -234,7 +558,50 @@ func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bu } func (h *Handler) Start(address string) error { - fmt.Printf("Starting API server on %s\n", address) + h.logger.Info("server_starting", + "address", address, + "log_format", h.logConfig.Format, + "log_level", h.logConfig.LevelName, + "audit_log", h.logConfig.Audit, + ) h.setupRoutes() - return http.ListenAndServe(address, h.router) + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(stop) + server := http.Server{ + Addr: address, + Handler: h.router, + } + errCh := make(chan error, 1) + + go func() { + if err := server.ListenAndServe(); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + errCh <- err + } + } + }() + + select { + case <-stop: + h.logger.Info("shutdown_signal_received") + case err := <-errCh: + h.logger.Error("server_listen_failed", "error", err) + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + h.logger.Error("server_shutdown_failed", "error", err) + return err + } + if err := h.svc.Close(); err != nil { + h.logger.Error("service_close_failed", "error", err) + return err + } + + h.logger.Info("server_stopped") + return nil } diff --git a/api/s3_errors.go b/api/s3_errors.go index 8792621..c446e4d 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -5,6 +5,7 @@ import ( "errors" "fs/metadata" "fs/models" + "fs/service" "net/http" ) @@ -25,6 +26,26 @@ var ( Code: "NotImplemented", Message: "A header you provided implies functionality that is not implemented.", } + s3ErrInvalidPart = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidPart", + Message: "One or more of the specified parts could not be found.", + } + s3ErrInvalidPartOrder = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidPartOrder", + Message: "The list of parts was not in ascending order.", + } + s3ErrMalformedXML = s3APIError{ + Status: http.StatusBadRequest, + Code: "MalformedXML", + Message: "The XML you provided was not well-formed or did not validate against our published schema.", + } + s3ErrEntityTooSmall = s3APIError{ + Status: http.StatusBadRequest, + Code: "EntityTooSmall", + Message: "Your proposed upload is smaller than the minimum allowed object size.", + } s3ErrInternal = s3APIError{ Status: http.StatusInternalServerError, Code: "InternalError", @@ -64,6 +85,26 @@ func mapToS3Error(err error) s3APIError { Code: "NoSuchKey", Message: "The specified key does not exist.", } + case errors.Is(err, metadata.ErrMultipartNotFound): + return s3APIError{ + Status: http.StatusNotFound, + Code: "NoSuchUpload", + Message: "The specified multipart upload does not exist.", + } + case errors.Is(err, metadata.ErrMultipartNotPending): + return s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidRequest", + Message: "The multipart upload is not in a valid state for this operation.", + } + case errors.Is(err, service.ErrInvalidPart): + return s3ErrInvalidPart + case errors.Is(err, service.ErrInvalidPartOrder): + return s3ErrInvalidPartOrder + case errors.Is(err, service.ErrInvalidCompleteRequest): + return s3ErrMalformedXML + case errors.Is(err, service.ErrEntityTooSmall): + return s3ErrEntityTooSmall default: return s3ErrInternal } diff --git a/go.mod b/go.mod index 020cd73..2b9a16e 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,12 @@ module fs go 1.25.7 require ( - github.com/go-chi/chi/v5 v5.2.5 // indirect - github.com/klauspost/cpuid/v2 v2.3.0 // indirect - github.com/klauspost/reedsolomon v1.13.2 // indirect - go.etcd.io/bbolt v1.4.3 // indirect + github.com/go-chi/chi/v5 v5.2.5 + github.com/google/uuid v1.6.0 + go.etcd.io/bbolt v1.4.3 +) + +require ( + github.com/joho/godotenv v1.5.1 // indirect golang.org/x/sys v0.41.0 // indirect ) diff --git a/go.sum b/go.sum index d4bfca0..eb8ecee 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,20 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= -github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= -github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= -github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE= -github.com/klauspost/reedsolomon v1.13.2/go.mod h1:ggJT9lc71Vu+cSOPBlxGvBN6TfAS77qB4fp8vJ05NSA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..7290bf2 --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,150 @@ +package logging + +import ( + "log/slog" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/go-chi/chi/v5/middleware" +) + +type Config struct { + Level slog.Level + LevelName string + Format string + Audit bool + AddSource bool + DebugMode bool +} + +func ConfigFromEnv() Config { + levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL"))) + format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT"))) + return ConfigFromValues(levelName, format, envBool("AUDIT_LOG", true)) +} + +func ConfigFromValues(levelName, format string, audit bool) Config { + levelName = strings.ToLower(strings.TrimSpace(levelName)) + if levelName == "" { + levelName = "info" + } + level := parseLevel(levelName) + levelName = strings.ToUpper(level.String()) + + format = strings.ToLower(strings.TrimSpace(format)) + if format == "" { + format = "text" + } + if format != "json" && format != "text" { + format = "text" + } + + debugMode := level <= slog.LevelDebug + return Config{ + Level: level, + LevelName: levelName, + Format: format, + Audit: audit, + AddSource: debugMode, + DebugMode: debugMode, + } +} + +func NewLogger(cfg Config) *slog.Logger { + opts := &slog.HandlerOptions{ + Level: cfg.Level, + AddSource: cfg.AddSource, + } + opts.ReplaceAttr = func(_ []string, attr slog.Attr) slog.Attr { + if attr.Key == slog.SourceKey { + if src, ok := attr.Value.Any().(*slog.Source); ok && src != nil { + attr.Key = "src" + attr.Value = slog.StringValue(filepath.Base(src.File) + ":" + strconv.Itoa(src.Line)) + } + } + return attr + } + + var handler slog.Handler + if cfg.Format == "json" { + handler = slog.NewJSONHandler(os.Stdout, opts) + } else { + handler = slog.NewTextHandler(os.Stdout, opts) + } + + logger := slog.New(handler) + slog.SetDefault(logger) + return logger +} + +func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) + + next.ServeHTTP(ww, r) + + if !cfg.Audit && !cfg.DebugMode { + return + } + + elapsed := time.Since(start) + status := ww.Status() + if status == 0 { + status = http.StatusOK + } + attrs := []any{ + "method", r.Method, + "path", r.URL.Path, + "status", status, + "bytes", ww.BytesWritten(), + "duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0, + "remote_addr", r.RemoteAddr, + } + + if cfg.DebugMode { + attrs = append(attrs, + "query", r.URL.RawQuery, + "user_agent", r.UserAgent(), + "content_length", r.ContentLength, + "content_type", r.Header.Get("Content-Type"), + "x_amz_sha256", r.Header.Get("x-amz-content-sha256"), + ) + logger.Debug("http_request", attrs...) + return + } + + logger.Info("http_request", attrs...) + }) + } +} + +func envBool(key string, defaultValue bool) bool { + raw := os.Getenv(key) + if raw == "" { + return defaultValue + } + value, err := strconv.ParseBool(raw) + if err != nil { + return defaultValue + } + return value +} + +func parseLevel(levelName string) slog.Level { + switch levelName { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/main.go b/main.go index 74d37c7..53a28ed 100644 --- a/main.go +++ b/main.go @@ -1,24 +1,51 @@ package main import ( - "fmt" "fs/api" + "fs/logging" "fs/metadata" "fs/service" + "fs/storage" + "fs/utils" + "os" + "path/filepath" + "strconv" ) func main() { + config := utils.NewConfig() + logConfig := logging.ConfigFromValues(config.LogLevel, config.LogFormat, config.AuditLog) + logger := logging.NewLogger(logConfig) + logger.Info("boot", + "log_level", logConfig.LevelName, + "log_format", logConfig.Format, + "audit_log", logConfig.Audit, + "data_path", config.DataPath, + ) - metadataHandler, err := metadata.NewMetadataHandler("metadata.db") - if err != nil { - fmt.Printf("Error initializing metadata handler: %v\n", err) + if err := os.MkdirAll(config.DataPath, 0o755); err != nil { + logger.Error("failed_to_prepare_data_path", "path", config.DataPath, "error", err) return } - objectService := service.NewObjectService(metadataHandler) - handler := api.NewHandler(objectService) - err = handler.Start("localhost:3000") + dbPath := filepath.Join(config.DataPath, "metadata.db") + metadataHandler, err := metadata.NewMetadataHandler(dbPath) if err != nil { + logger.Error("failed_to_initialize_metadata_handler", "error", err) + return + } + blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize) + if err != nil { + _ = metadataHandler.Close() + logger.Error("failed_to_initialize_blob_store", "error", err) + return + } + + objectService := service.NewObjectService(metadataHandler, blobHandler) + handler := api.NewHandler(objectService, logger, logConfig) + addr := config.Address + ":" + strconv.Itoa(config.Port) + if err = handler.Start(addr); err != nil { + logger.Error("server_stopped_with_error", "error", err) return } } diff --git a/metadata/metadata.go b/metadata/metadata.go index 07a2344..6a2df20 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -6,9 +6,12 @@ import ( "fmt" "fs/models" "regexp" + "sort" "strings" "time" + "github.com/google/uuid" + "go.etcd.io/bbolt" ) @@ -17,6 +20,8 @@ type MetadataHandler struct { } var systemIndex = []byte("__SYSTEM_BUCKETS__") +var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__") +var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__") var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`) @@ -26,10 +31,12 @@ var ( ErrBucketNotFound = errors.New("bucket not found") ErrBucketNotEmpty = errors.New("bucket not empty") ErrObjectNotFound = errors.New("object not found") + ErrMultipartNotFound = errors.New("multipart upload not found") + ErrMultipartNotPending = errors.New("multipart upload is not pending") ) func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { - db, err := bbolt.Open(dbPath, 0600, nil) + db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 2 * time.Second}) if err != nil { return nil, err } @@ -43,10 +50,30 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } + err = h.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(multipartUploadIndex) + return err + }) + if err != nil { + _ = db.Close() + return nil, err + } + err = h.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex) + return err + }) + if err != nil { + _ = db.Close() + return nil, err + } return h, nil } +func (h *MetadataHandler) Close() error { + return h.db.Close() +} + func (h *MetadataHandler) CreateBucket(bucketName string) error { if !validBucketName.MatchString(bucketName) { return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName) @@ -265,3 +292,296 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error { return nil } + +func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) { + deleted := make([]string, 0, len(keys)) + + err := h.db.Update(func(tx *bbolt.Tx) error { + metadataBucket := tx.Bucket([]byte(bucket)) + if metadataBucket == nil { + return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket) + } + + for _, key := range keys { + if key == "" { + continue + } + if metadataBucket.Get([]byte(key)) != nil { + if err := metadataBucket.Delete([]byte(key)); err != nil { + return err + } + } + deleted = append(deleted, key) + } + return nil + }) + if err != nil { + return nil, err + } + return deleted, nil +} + +func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { + var upload *models.MultipartUpload + + err := h.db.View(func(tx *bbolt.Tx) error { + systemIndexBucket := tx.Bucket([]byte(systemIndex)) + if systemIndexBucket == nil { + return errors.New("system index not found") + } + if systemIndexBucket.Get([]byte(bucket)) != nil { + return nil + } + return ErrBucketNotFound + }) + if err != nil { + return nil, err + } + uploadId := uuid.New().String() + createdAt := time.Now().UTC().Format(time.RFC3339) + upload = &models.MultipartUpload{ + Bucket: bucket, + Key: key, + UploadID: uploadId, + CreatedAt: createdAt, + State: "pending", + } + + err = h.db.Update(func(tx *bbolt.Tx) error { + multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex)) + if multipartUploadBucket == nil { + return errors.New("multipart upload index not found") + } + payload, err := json.Marshal(upload) + if err != nil { + return err + } + err = multipartUploadBucket.Put([]byte(uploadId), payload) + if err != nil { + return err + } + + return nil + }) + if err != nil { + return nil, err + } + + return upload, nil +} + +func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + multipartUploadBucket := tx.Bucket(multipartUploadIndex) + if multipartUploadBucket == nil { + return nil, errors.New("multipart upload index not found") + } + return multipartUploadBucket, nil +} + +func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex) + if multipartPartsBucket == nil { + return nil, errors.New("multipart upload parts index not found") + } + return multipartPartsBucket, nil +} + +func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) { + payload := multipartUploadBucket.Get([]byte(uploadID)) + if payload == nil { + return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID) + } + upload := models.MultipartUpload{} + if err := json.Unmarshal(payload, &upload); err != nil { + return nil, err + } + return &upload, nil +} + +func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) { + multipartUploadBucket, err := getMultipartUploadBucket(tx) + if err != nil { + return nil, nil, err + } + upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID) + if err != nil { + return nil, nil, err + } + return upload, multipartUploadBucket, nil +} + +func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error { + payload, err := json.Marshal(upload) + if err != nil { + return err + } + return multipartUploadBucket.Put([]byte(uploadID), payload) +} + +func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error { + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + + prefix := uploadID + ":" + cursor := multipartPartsBucket.Cursor() + keysToDelete := make([][]byte, 0) + for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() { + keyCopy := make([]byte, len(k)) + copy(keyCopy, k) + keysToDelete = append(keysToDelete, keyCopy) + } + for _, key := range keysToDelete { + if err := multipartPartsBucket.Delete(key); err != nil { + return err + } + } + return nil +} + +func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) { + var upload *models.MultipartUpload + err := h.db.View(func(tx *bbolt.Tx) error { + var err error + upload, _, err = getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + return upload, nil +} +func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error { + if part.PartNumber < 1 || part.PartNumber > 10000 { + return fmt.Errorf("invalid part number: %d", part.PartNumber) + } + + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, _, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State != "pending" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + + key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber) + payload, err := json.Marshal(part) + if err != nil { + return err + } + return multipartPartsBucket.Put([]byte(key), payload) + }) + if err != nil { + return err + } + return nil +} +func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) { + parts := make([]models.UploadedPart, 0) + + err := h.db.View(func(tx *bbolt.Tx) error { + if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil { + return err + } + + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + prefix := uploadID + ":" + cursor := multipartPartsBucket.Cursor() + for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() { + part := models.UploadedPart{} + if err := json.Unmarshal(v, &part); err != nil { + return err + } + parts = append(parts, part) + } + return nil + }) + if err != nil { + return nil, err + } + + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + return parts, nil +} +func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error { + if final == nil { + return errors.New("final object manifest is required") + } + + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State != "pending" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + + metadataBucket := tx.Bucket([]byte(upload.Bucket)) + if metadataBucket == nil { + return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket) + } + final.Bucket = upload.Bucket + final.Key = upload.Key + finalPayload, err := json.Marshal(final) + if err != nil { + return err + } + if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil { + return err + } + + upload.State = "completed" + if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil { + return err + } + + if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + return nil +} +func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error { + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State == "completed" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + upload.State = "aborted" + if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil { + return err + } + + if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + return nil +} diff --git a/models/models.go b/models/models.go index 78847bb..fe6d74f 100644 --- a/models/models.go +++ b/models/models.go @@ -58,3 +58,81 @@ type Contents struct { type CommonPrefixes struct { Prefix string `xml:"Prefix"` } + +type MultipartUpload struct { + UploadID string `json:"upload_id" xml:"UploadId"` + Bucket string `json:"bucket" xml:"Bucket"` + Key string `json:"key" xml:"Key"` + CreatedAt string `json:"created_at" xml:"CreatedAt"` + State string `json:"state" xml:"State"` +} + +type InitiateMultipartUploadResult struct { + XMLName xml.Name `xml:"InitiateMultipartUploadResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` +} +type UploadedPart struct { + PartNumber int `json:"part_number" xml:"PartNumber"` + ETag string `json:"etag" xml:"ETag"` + Size int64 `json:"size" xml:"Size"` + Chunks []string `json:"chunks"` + CreatedAt int64 `json:"created_at"` +} + +type CompletedPart struct { + PartNumber int `xml:"PartNumber"` + ETag string `xml:"ETag"` +} + +type CompleteMultipartUploadRequest struct { + XMLName xml.Name `xml:"CompleteMultipartUpload"` + Parts []CompletedPart `xml:"Part"` +} + +type CompleteMultipartUploadResult struct { + XMLName xml.Name `xml:"CompleteMultipartUploadResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + ETag string `xml:"ETag"` + Location string `xml:"Location,omitempty"` +} + +type ListPartsResult struct { + XMLName xml.Name `xml:"ListPartsResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` + Parts []PartItem `xml:"Part"` +} + +type PartItem struct { + PartNumber int `xml:"PartNumber"` + LastModified string `xml:"LastModified"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` +} + +type DeleteObjectsRequest struct { + XMLName xml.Name `xml:"Delete"` + Objects []DeleteObjectIdentity `xml:"Object"` + Quiet bool `xml:"Quiet"` +} + +type DeleteObjectIdentity struct { + Key string `xml:"Key"` +} + +type DeleteObjectsResult struct { + XMLName xml.Name `xml:"DeleteResult"` + Xmlns string `xml:"xmlns,attr"` + Deleted []DeletedEntry `xml:"Deleted,omitempty"` +} + +type DeletedEntry struct { + Key string `xml:"Key"` +} diff --git a/service/service.go b/service/service.go index 574fc03..560816f 100644 --- a/service/service.go +++ b/service/service.go @@ -1,25 +1,38 @@ package service import ( + "crypto/md5" + "encoding/hex" + "errors" "fmt" "fs/metadata" "fs/models" "fs/storage" "io" + "log/slog" + "strings" "time" ) type ObjectService struct { - metadataHandler *metadata.MetadataHandler + metadata *metadata.MetadataHandler + blob *storage.BlobStore } -func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { - return &ObjectService{metadataHandler: metadataHandler} +var ( + ErrInvalidPart = errors.New("invalid multipart part") + ErrInvalidPartOrder = errors.New("invalid multipart part order") + ErrInvalidCompleteRequest = errors.New("invalid complete multipart request") + ErrEntityTooSmall = errors.New("multipart entity too small") +) + +func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore) *ObjectService { + return &ObjectService{metadata: metadataHandler, blob: blobHandler} } func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) { - chunks, size, etag, err := storage.IngestStream(input) + chunks, size, etag, err := s.blob.IngestStream(input) if err != nil { return nil, err } @@ -34,8 +47,14 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read Chunks: chunks, CreatedAt: timestamp, } - fmt.Println(manifest) - if err = s.metadataHandler.PutManifest(manifest); err != nil { + slog.Debug("object_written_manifest", + "bucket", manifest.Bucket, + "key", manifest.Key, + "size", manifest.Size, + "chunk_count", len(manifest.Chunks), + "etag", manifest.ETag, + ) + if err = s.metadata.PutManifest(manifest); err != nil { return nil, err } @@ -43,7 +62,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read } func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { - manifest, err := s.metadataHandler.GetManifest(bucket, key) + manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { return nil, nil, err } @@ -57,7 +76,7 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob } }(pw) - err := storage.AssembleStream(manifest.Chunks, pw) + err := s.blob.AssembleStream(manifest.Chunks, pw) if err != nil { return } @@ -66,7 +85,7 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob } func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) { - manifest, err := s.metadataHandler.GetManifest(bucket, key) + manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { return models.ObjectManifest{}, err } @@ -74,26 +93,176 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e } func (s *ObjectService) DeleteObject(bucket, key string) error { - return s.metadataHandler.DeleteManifest(bucket, key) + return s.metadata.DeleteManifest(bucket, key) } func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) { - return s.metadataHandler.ListObjects(bucket, prefix) + return s.metadata.ListObjects(bucket, prefix) } func (s *ObjectService) CreateBucket(bucket string) error { - return s.metadataHandler.CreateBucket(bucket) + return s.metadata.CreateBucket(bucket) } func (s *ObjectService) HeadBucket(bucket string) error { - _, err := s.metadataHandler.GetBucketManifest(bucket) + _, err := s.metadata.GetBucketManifest(bucket) return err } func (s *ObjectService) DeleteBucket(bucket string) error { - return s.metadataHandler.DeleteBucket(bucket) + return s.metadata.DeleteBucket(bucket) } func (s *ObjectService) ListBuckets() ([]string, error) { - return s.metadataHandler.ListBuckets() + return s.metadata.ListBuckets() +} + +func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { + return s.metadata.DeleteManifests(bucket, keys) +} + +func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { + return s.metadata.CreateMultipartUpload(bucket, key) +} + +func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { + if partNumber < 1 || partNumber > 10000 { + return "", ErrInvalidPart + } + + upload, err := s.metadata.GetMultipartUpload(uploadId) + if err != nil { + return "", err + } + if upload.Bucket != bucket || upload.Key != key { + return "", metadata.ErrMultipartNotFound + } + + var uploadedPart models.UploadedPart + chunkIds, totalSize, etag, err := s.blob.IngestStream(input) + if err != nil { + return "", err + } + uploadedPart = models.UploadedPart{ + PartNumber: partNumber, + ETag: etag, + Size: totalSize, + Chunks: chunkIds, + CreatedAt: time.Now().Unix(), + } + err = s.metadata.PutMultipartPart(uploadId, uploadedPart) + if err != nil { + return "", err + } + return etag, nil +} + +func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { + upload, err := s.metadata.GetMultipartUpload(uploadID) + if err != nil { + return nil, err + } + if upload.Bucket != bucket || upload.Key != key { + return nil, metadata.ErrMultipartNotFound + } + return s.metadata.ListMultipartParts(uploadID) +} + +func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { + if len(completed) == 0 { + return nil, ErrInvalidCompleteRequest + } + + upload, err := s.metadata.GetMultipartUpload(uploadID) + if err != nil { + return nil, err + } + if upload.Bucket != bucket || upload.Key != key { + return nil, metadata.ErrMultipartNotFound + } + + storedParts, err := s.metadata.ListMultipartParts(uploadID) + if err != nil { + return nil, err + } + partsByNumber := make(map[int]models.UploadedPart, len(storedParts)) + for _, part := range storedParts { + partsByNumber[part.PartNumber] = part + } + + lastPartNumber := 0 + orderedParts := make([]models.UploadedPart, 0, len(completed)) + chunks := make([]string, 0) + var totalSize int64 + + for i, part := range completed { + if part.PartNumber <= lastPartNumber { + return nil, ErrInvalidPartOrder + } + lastPartNumber = part.PartNumber + + storedPart, ok := partsByNumber[part.PartNumber] + if !ok { + return nil, ErrInvalidPart + } + if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) { + return nil, ErrInvalidPart + } + if i < len(completed)-1 && storedPart.Size < 5*1024*1024 { + return nil, ErrEntityTooSmall + } + + orderedParts = append(orderedParts, storedPart) + chunks = append(chunks, storedPart.Chunks...) + totalSize += storedPart.Size + } + + finalETag := buildMultipartETag(orderedParts) + manifest := &models.ObjectManifest{ + Bucket: bucket, + Key: key, + Size: totalSize, + ContentType: "application/octet-stream", + ETag: finalETag, + Chunks: chunks, + CreatedAt: time.Now().Unix(), + } + + if err := s.metadata.CompleteMultipartUpload(uploadID, manifest); err != nil { + return nil, err + } + + return manifest, nil +} + +func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { + upload, err := s.metadata.GetMultipartUpload(uploadID) + if err != nil { + return err + } + if upload.Bucket != bucket || upload.Key != key { + return metadata.ErrMultipartNotFound + } + return s.metadata.AbortMultipartUpload(uploadID) +} + +func normalizeETag(etag string) string { + return strings.Trim(etag, "\"") +} + +func buildMultipartETag(parts []models.UploadedPart) string { + hasher := md5.New() + for _, part := range parts { + etagBytes, err := hex.DecodeString(normalizeETag(part.ETag)) + if err == nil { + _, _ = hasher.Write(etagBytes) + continue + } + _, _ = hasher.Write([]byte(normalizeETag(part.ETag))) + } + return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts)) +} + +func (s *ObjectService) Close() error { + return s.metadata.Close() } diff --git a/storage/blob.go b/storage/blob.go index 268f762..23d21f6 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -5,18 +5,41 @@ import ( "crypto/sha256" "encoding/hex" "errors" + "fmt" "io" "os" "path/filepath" + "strings" ) -const chunkSize = 64 * 1024 -const blobRoot = "blobs/" +const blobRoot = "blobs" +const maxChunkSize = 64 * 1024 * 1024 -func IngestStream(stream io.Reader) ([]string, int64, string, error) { +type BlobStore struct { + dataRoot string + chunkSize int +} + +func NewBlobStore(root string, chunkSize int) (*BlobStore, error) { + root = strings.TrimSpace(root) + if root == "" { + return nil, errors.New("blob root is required") + } + if chunkSize <= 0 || chunkSize > maxChunkSize { + return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize) + } + + cleanRoot := filepath.Clean(root) + if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil { + return nil, err + } + return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil +} + +func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) { fullFileHasher := md5.New() - buffer := make([]byte, chunkSize) + buffer := make([]byte, bs.chunkSize) var totalSize int64 var chunkIDs []string @@ -35,7 +58,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) { chunkHash := sha256.Sum256(chunkData) chunkID := hex.EncodeToString(chunkHash[:]) - err := saveBlob(chunkID, chunkData) + err := bs.saveBlob(chunkID, chunkData) if err != nil { return nil, 0, "", err } @@ -54,8 +77,11 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) { return chunkIDs, totalSize, etag, nil } -func saveBlob(chunkID string, data []byte) error { - dir := filepath.Join(blobRoot, chunkID[:2], chunkID[2:4]) +func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { + if !isValidChunkID(chunkID) { + return fmt.Errorf("invalid chunk id: %q", chunkID) + } + dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4]) if err := os.MkdirAll(dir, 0755); err != nil { return err } @@ -69,9 +95,9 @@ func saveBlob(chunkID string, data []byte) error { return nil } -func AssembleStream(chunkIDs []string, w *io.PipeWriter) error { +func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error { for _, chunkID := range chunkIDs { - chunkData, err := GetBlob(chunkID) + chunkData, err := bs.GetBlob(chunkID) if err != nil { return err } @@ -82,7 +108,21 @@ func AssembleStream(chunkIDs []string, w *io.PipeWriter) error { return nil } -func GetBlob(chunkID string) ([]byte, error) { - - return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID)) +func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) { + if !isValidChunkID(chunkID) { + return nil, fmt.Errorf("invalid chunk id: %q", chunkID) + } + return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) +} + +func isValidChunkID(chunkID string) bool { + if len(chunkID) != sha256.Size*2 { + return false + } + for _, ch := range chunkID { + if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') { + return false + } + } + return true } diff --git a/utils/config.go b/utils/config.go new file mode 100644 index 0000000..a73e195 --- /dev/null +++ b/utils/config.go @@ -0,0 +1,89 @@ +package utils + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/joho/godotenv" +) + +type Config struct { + DataPath string + Address string + Port int + ChunkSize int + LogLevel string + LogFormat string + AuditLog bool +} + +func NewConfig() *Config { + _ = godotenv.Load() + + config := &Config{ + DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")), + Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"), + Port: envIntRange("PORT", 3000, 1, 65535), + ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024), + LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), + LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), + AuditLog: envBool("AUDIT_LOG", true), + } + + if config.LogFormat != "json" && config.LogFormat != "text" { + config.LogFormat = "text" + } + + return config + +} + +func envIntRange(key string, defaultValue, minValue, maxValue int) int { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return defaultValue + } + value, err := strconv.Atoi(raw) + if err != nil { + return defaultValue + } + if value < minValue || value > maxValue { + return defaultValue + } + return value +} + +func envBool(key string, defaultValue bool) bool { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return defaultValue + } + value, err := strconv.ParseBool(raw) + if err != nil { + return defaultValue + } + return value +} + +func firstNonEmpty(values ...string) string { + for _, v := range values { + if v != "" { + return v + } + } + return "" +} + +func sanitizeDataPath(raw string) string { + cleaned := strings.TrimSpace(raw) + if cleaned == "" { + cleaned = "." + } + cleaned = filepath.Clean(cleaned) + if abs, err := filepath.Abs(cleaned); err == nil { + return abs + } + return cleaned +}