From 9b5035dfa0f5b50d69f36d64b6bc50b15b7dda88 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sun, 22 Feb 2026 13:42:23 +0100 Subject: [PATCH 1/8] Initial Multipart Upload --- api/api.go | 46 ++++++++++++++++++++++++++++++--- go.mod | 1 + go.sum | 2 ++ metadata/metadata.go | 60 ++++++++++++++++++++++++++++++++++++++++++++ models/models.go | 16 ++++++++++++ service/service.go | 8 ++++++ 6 files changed, 129 insertions(+), 4 deletions(-) diff --git a/api/api.go b/api/api.go index ab3b772..2e183c9 100644 --- a/api/api.go +++ b/api/api.go @@ -1,9 +1,11 @@ package api import ( + "encoding/xml" "errors" "fmt" "fs/metadata" + "fs/models" "fs/service" "fs/utils" "io" @@ -47,6 +49,7 @@ func (h *Handler) setupRoutes() { h.router.Get("/{bucket}/*", h.handleGetObject) h.router.Put("/{bucket}/*", h.handlePutObject) + h.router.Post("/{bucket}/*", h.handlePostObject) h.router.Head("/{bucket}/*", h.handleHeadObject) h.router.Delete("/{bucket}/*", h.handleDeleteObject) } @@ -68,10 +71,6 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { return } - if r.URL.Query().Get("uploadId") != "" { - - } - stream, manifest, err := h.svc.GetObject(bucket, key) if err != nil { writeMappedS3Error(w, r, err) @@ -88,6 +87,41 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { } +func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { + bucket := chi.URLParam(r, "bucket") + key := chi.URLParam(r, "*") + if key == "" { + writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) + return + } + + if _, ok := r.URL.Query()["uploads"]; ok { + upload, err := h.svc.CreateMultipartUpload(bucket, key) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + response := models.InitiateMultipartUploadResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: upload.Bucket, + Key: upload.Key, + UploadID: upload.UploadID, + } + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) + return + } + + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) +} + func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -95,6 +129,10 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } + if r.URL.Query().Get("uploads") != "" { + if r.URL.Query().Get("partNumber") != "" { + } + } contentType := r.Header.Get("Content-Type") if contentType == "" { diff --git a/go.mod b/go.mod index 020cd73..8038354 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.7 require ( github.com/go-chi/chi/v5 v5.2.5 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/klauspost/reedsolomon v1.13.2 // indirect go.etcd.io/bbolt v1.4.3 // indirect diff --git a/go.sum b/go.sum index d4bfca0..f2ff379 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE= diff --git a/metadata/metadata.go b/metadata/metadata.go index 07a2344..1438a64 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/google/uuid" + "go.etcd.io/bbolt" ) @@ -17,6 +19,7 @@ type MetadataHandler struct { } var systemIndex = []byte("__SYSTEM_BUCKETS__") +var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__") var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`) @@ -43,6 +46,14 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } + err = h.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(multipartUploadIndex) + return err + }) + if err != nil { + _ = db.Close() + return nil, err + } return h, nil } @@ -265,3 +276,52 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error { return nil } + +func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { + var upload *models.MultipartUpload + + err := h.db.View(func(tx *bbolt.Tx) error { + systemIndexBucket := tx.Bucket([]byte(systemIndex)) + if systemIndexBucket == nil { + return errors.New("system index not found") + } + if systemIndexBucket.Get([]byte(bucket)) != nil { + return nil + } + return ErrBucketNotFound + }) + if err != nil { + return nil, err + } + uploadId := uuid.New().String() + createdAt := time.Now().UTC().Format(time.RFC3339) + upload = &models.MultipartUpload{ + Bucket: bucket, + Key: key, + UploadID: uploadId, + CreatedAt: createdAt, + State: "pending", + } + + err = h.db.Update(func(tx *bbolt.Tx) error { + multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex)) + if multipartUploadBucket == nil { + return errors.New("multipart upload index not found") + } + payload, err := json.Marshal(upload) + if err != nil { + return err + } + err = multipartUploadBucket.Put([]byte(uploadId), payload) + if err != nil { + return err + } + + return nil + }) + if err != nil { + return nil, err + } + + return upload, nil +} diff --git a/models/models.go b/models/models.go index 78847bb..6b11cc1 100644 --- a/models/models.go +++ b/models/models.go @@ -58,3 +58,19 @@ type Contents struct { type CommonPrefixes struct { Prefix string `xml:"Prefix"` } + +type MultipartUpload struct { + UploadID string `json:"upload_id" xml:"UploadId"` + Bucket string `json:"bucket" xml:"Bucket"` + Key string `json:"key" xml:"Key"` + CreatedAt string `json:"created_at" xml:"CreatedAt"` + State string `json:"state" xml:"State"` +} + +type InitiateMultipartUploadResult struct { + XMLName xml.Name `xml:"InitiateMultipartUploadResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` +} diff --git a/service/service.go b/service/service.go index 574fc03..2448823 100644 --- a/service/service.go +++ b/service/service.go @@ -97,3 +97,11 @@ func (s *ObjectService) DeleteBucket(bucket string) error { func (s *ObjectService) ListBuckets() ([]string, error) { return s.metadataHandler.ListBuckets() } + +func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { + return s.metadataHandler.CreateMultipartUpload(bucket, key) +} + +func (s *ObjectService) PutMultipartObject(bucket, key, uploadId string, input io.Reader) (*models.MultipartUpload, error) { + return nil, nil +} From 5438a7f4b4d91c933e77ba1828d53e978570f86a Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sun, 22 Feb 2026 13:43:23 +0100 Subject: [PATCH 2/8] Updated gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 7a250ef..68f407a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ blobs/ *.db .idea/ +.gocache/ \ No newline at end of file From 111ce5b6699d27633f9c82bcdf3aa619e35dcbfb Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sun, 22 Feb 2026 14:46:04 +0100 Subject: [PATCH 3/8] Working MultipartUpload that needs minor tweaks. --- api/api.go | 111 ++++++++++++++++++++- api/s3_errors.go | 34 +++++++ metadata/metadata.go | 228 +++++++++++++++++++++++++++++++++++++++++++ models/models.go | 42 ++++++++ service/service.go | 145 ++++++++++++++++++++++++++- 5 files changed, 554 insertions(+), 6 deletions(-) diff --git a/api/api.go b/api/api.go index 2e183c9..42421db 100644 --- a/api/api.go +++ b/api/api.go @@ -71,6 +71,11 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { return } + if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { + h.handleListMultipartParts(w, r, bucket, key, uploadID) + return + } + stream, manifest, err := h.svc.GetObject(bucket, key) if err != nil { writeMappedS3Error(w, r, err) @@ -94,6 +99,7 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } + defer r.Body.Close() if _, ok := r.URL.Query()["uploads"]; ok { upload, err := h.svc.CreateMultipartUpload(bucket, key) @@ -119,6 +125,39 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { return } + if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" { + var req models.CompleteMultipartUploadRequest + if err := xml.NewDecoder(r.Body).Decode(&req); err != nil { + writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) + return + } + + manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.CompleteMultipartUploadResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucket, + Key: key, + ETag: `"` + manifest.ETag + `"`, + Location: r.URL.Path, + } + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) + return + } + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } @@ -129,9 +168,31 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - if r.URL.Query().Get("uploads") != "" { - if r.URL.Query().Get("partNumber") != "" { + defer r.Body.Close() + + uploadID := r.URL.Query().Get("uploadId") + partNumberRaw := r.URL.Query().Get("partNumber") + if uploadID != "" || partNumberRaw != "" { + if uploadID == "" || partNumberRaw == "" { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return } + + partNumber, err := strconv.Atoi(partNumberRaw) + if err != nil { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return + } + + etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, r.Body) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.Header().Set("ETag", `"`+etag+`"`) + w.Header().Set("Content-Length", "0") + w.WriteHeader(http.StatusOK) + return } contentType := r.Header.Get("Content-Type") @@ -140,7 +201,6 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { } manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body) - defer r.Body.Close() if err != nil { writeMappedS3Error(w, r, err) @@ -153,6 +213,41 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) { + parts, err := h.svc.ListMultipartParts(bucket, key, uploadID) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.ListPartsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucket, + Key: key, + UploadID: uploadID, + Parts: make([]models.PartItem, 0, len(parts)), + } + for _, part := range parts { + response.Parts = append(response.Parts, models.PartItem{ + PartNumber: part.PartNumber, + LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"), + ETag: `"` + part.ETag + `"`, + Size: part.Size, + }) + } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) +} + func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -198,7 +293,15 @@ func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - + if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" { + err := h.svc.AbortMultipartUpload(bucket, key, uploadId) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + w.WriteHeader(http.StatusNoContent) + return + } err := h.svc.DeleteObject(bucket, key) if err != nil { if errors.Is(err, metadata.ErrObjectNotFound) { diff --git a/api/s3_errors.go b/api/s3_errors.go index 8792621..7d65519 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -5,6 +5,7 @@ import ( "errors" "fs/metadata" "fs/models" + "fs/service" "net/http" ) @@ -25,6 +26,21 @@ var ( Code: "NotImplemented", Message: "A header you provided implies functionality that is not implemented.", } + s3ErrInvalidPart = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidPart", + Message: "One or more of the specified parts could not be found.", + } + s3ErrInvalidPartOrder = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidPartOrder", + Message: "The list of parts was not in ascending order.", + } + s3ErrMalformedXML = s3APIError{ + Status: http.StatusBadRequest, + Code: "MalformedXML", + Message: "The XML you provided was not well-formed or did not validate against our published schema.", + } s3ErrInternal = s3APIError{ Status: http.StatusInternalServerError, Code: "InternalError", @@ -64,6 +80,24 @@ func mapToS3Error(err error) s3APIError { Code: "NoSuchKey", Message: "The specified key does not exist.", } + case errors.Is(err, metadata.ErrMultipartNotFound): + return s3APIError{ + Status: http.StatusNotFound, + Code: "NoSuchUpload", + Message: "The specified multipart upload does not exist.", + } + case errors.Is(err, metadata.ErrMultipartNotPending): + return s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidRequest", + Message: "The multipart upload is not in a valid state for this operation.", + } + case errors.Is(err, service.ErrInvalidPart): + return s3ErrInvalidPart + case errors.Is(err, service.ErrInvalidPartOrder): + return s3ErrInvalidPartOrder + case errors.Is(err, service.ErrInvalidCompleteRequest): + return s3ErrMalformedXML default: return s3ErrInternal } diff --git a/metadata/metadata.go b/metadata/metadata.go index 1438a64..1365f13 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -6,6 +6,7 @@ import ( "fmt" "fs/models" "regexp" + "sort" "strings" "time" @@ -20,6 +21,7 @@ type MetadataHandler struct { var systemIndex = []byte("__SYSTEM_BUCKETS__") var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__") +var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__") var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`) @@ -29,6 +31,8 @@ var ( ErrBucketNotFound = errors.New("bucket not found") ErrBucketNotEmpty = errors.New("bucket not empty") ErrObjectNotFound = errors.New("object not found") + ErrMultipartNotFound = errors.New("multipart upload not found") + ErrMultipartNotPending = errors.New("multipart upload is not pending") ) func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { @@ -54,6 +58,14 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } + err = h.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex) + return err + }) + if err != nil { + _ = db.Close() + return nil, err + } return h, nil } @@ -325,3 +337,219 @@ func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.Mul return upload, nil } + +func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + multipartUploadBucket := tx.Bucket(multipartUploadIndex) + if multipartUploadBucket == nil { + return nil, errors.New("multipart upload index not found") + } + return multipartUploadBucket, nil +} + +func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex) + if multipartPartsBucket == nil { + return nil, errors.New("multipart upload parts index not found") + } + return multipartPartsBucket, nil +} + +func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) { + payload := multipartUploadBucket.Get([]byte(uploadID)) + if payload == nil { + return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID) + } + upload := models.MultipartUpload{} + if err := json.Unmarshal(payload, &upload); err != nil { + return nil, err + } + return &upload, nil +} + +func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) { + multipartUploadBucket, err := getMultipartUploadBucket(tx) + if err != nil { + return nil, nil, err + } + upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID) + if err != nil { + return nil, nil, err + } + return upload, multipartUploadBucket, nil +} + +func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error { + payload, err := json.Marshal(upload) + if err != nil { + return err + } + return multipartUploadBucket.Put([]byte(uploadID), payload) +} + +func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error { + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + + prefix := uploadID + ":" + cursor := multipartPartsBucket.Cursor() + keysToDelete := make([][]byte, 0) + for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() { + keyCopy := make([]byte, len(k)) + copy(keyCopy, k) + keysToDelete = append(keysToDelete, keyCopy) + } + for _, key := range keysToDelete { + if err := multipartPartsBucket.Delete(key); err != nil { + return err + } + } + return nil +} + +func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) { + var upload *models.MultipartUpload + err := h.db.View(func(tx *bbolt.Tx) error { + var err error + upload, _, err = getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + return upload, nil +} +func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error { + if part.PartNumber < 1 || part.PartNumber > 10000 { + return fmt.Errorf("invalid part number: %d", part.PartNumber) + } + + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, _, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State != "pending" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + + key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber) + payload, err := json.Marshal(part) + if err != nil { + return err + } + return multipartPartsBucket.Put([]byte(key), payload) + }) + if err != nil { + return err + } + return nil +} +func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) { + parts := make([]models.UploadedPart, 0) + + err := h.db.View(func(tx *bbolt.Tx) error { + if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil { + return err + } + + multipartPartsBucket, err := getMultipartPartsBucket(tx) + if err != nil { + return err + } + prefix := uploadID + ":" + cursor := multipartPartsBucket.Cursor() + for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() { + part := models.UploadedPart{} + if err := json.Unmarshal(v, &part); err != nil { + return err + } + parts = append(parts, part) + } + return nil + }) + if err != nil { + return nil, err + } + + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + return parts, nil +} +func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error { + if final == nil { + return errors.New("final object manifest is required") + } + + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State != "pending" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + + metadataBucket := tx.Bucket([]byte(upload.Bucket)) + if metadataBucket == nil { + return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket) + } + final.Bucket = upload.Bucket + final.Key = upload.Key + finalPayload, err := json.Marshal(final) + if err != nil { + return err + } + if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil { + return err + } + + upload.State = "completed" + if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil { + return err + } + + if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + return nil +} +func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error { + err := h.db.Update(func(tx *bbolt.Tx) error { + upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) + if err != nil { + return err + } + if upload.State == "completed" { + return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID) + } + upload.State = "aborted" + if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil { + return err + } + + if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + return nil +} diff --git a/models/models.go b/models/models.go index 6b11cc1..6cace47 100644 --- a/models/models.go +++ b/models/models.go @@ -74,3 +74,45 @@ type InitiateMultipartUploadResult struct { Key string `xml:"Key"` UploadID string `xml:"UploadId"` } +type UploadedPart struct { + PartNumber int `json:"part_number" xml:"PartNumber"` + ETag string `json:"etag" xml:"ETag"` + Size int64 `json:"size" xml:"Size"` + Chunks []string `json:"chunks"` + CreatedAt int64 `json:"created_at"` +} + +type CompletedPart struct { + PartNumber int `xml:"PartNumber"` + ETag string `xml:"ETag"` +} + +type CompleteMultipartUploadRequest struct { + XMLName xml.Name `xml:"CompleteMultipartUpload"` + Parts []CompletedPart `xml:"Part"` +} + +type CompleteMultipartUploadResult struct { + XMLName xml.Name `xml:"CompleteMultipartUploadResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + ETag string `xml:"ETag"` + Location string `xml:"Location,omitempty"` +} + +type ListPartsResult struct { + XMLName xml.Name `xml:"ListPartsResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` + Parts []PartItem `xml:"Part"` +} + +type PartItem struct { + PartNumber int `xml:"PartNumber"` + LastModified string `xml:"LastModified"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` +} diff --git a/service/service.go b/service/service.go index 2448823..9d67e5d 100644 --- a/service/service.go +++ b/service/service.go @@ -1,11 +1,15 @@ package service import ( + "crypto/md5" + "encoding/hex" + "errors" "fmt" "fs/metadata" "fs/models" "fs/storage" "io" + "strings" "time" ) @@ -13,6 +17,12 @@ type ObjectService struct { metadataHandler *metadata.MetadataHandler } +var ( + ErrInvalidPart = errors.New("invalid multipart part") + ErrInvalidPartOrder = errors.New("invalid multipart part order") + ErrInvalidCompleteRequest = errors.New("invalid complete multipart request") +) + func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { return &ObjectService{metadataHandler: metadataHandler} } @@ -102,6 +112,137 @@ func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.Multi return s.metadataHandler.CreateMultipartUpload(bucket, key) } -func (s *ObjectService) PutMultipartObject(bucket, key, uploadId string, input io.Reader) (*models.MultipartUpload, error) { - return nil, nil +func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { + if partNumber < 1 || partNumber > 10000 { + return "", ErrInvalidPart + } + + upload, err := s.metadataHandler.GetMultipartUpload(uploadId) + if err != nil { + return "", err + } + if upload.Bucket != bucket || upload.Key != key { + return "", metadata.ErrMultipartNotFound + } + + var uploadedPart models.UploadedPart + chunkIds, totalSize, etag, err := storage.IngestStream(input) + if err != nil { + return "", err + } + uploadedPart = models.UploadedPart{ + PartNumber: partNumber, + ETag: etag, + Size: totalSize, + Chunks: chunkIds, + CreatedAt: time.Now().Unix(), + } + err = s.metadataHandler.PutMultipartPart(uploadId, uploadedPart) + if err != nil { + return "", err + } + return etag, nil +} + +func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { + upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + if err != nil { + return nil, err + } + if upload.Bucket != bucket || upload.Key != key { + return nil, metadata.ErrMultipartNotFound + } + return s.metadataHandler.ListMultipartParts(uploadID) +} + +func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { + if len(completed) == 0 { + return nil, ErrInvalidCompleteRequest + } + + upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + if err != nil { + return nil, err + } + if upload.Bucket != bucket || upload.Key != key { + return nil, metadata.ErrMultipartNotFound + } + + storedParts, err := s.metadataHandler.ListMultipartParts(uploadID) + if err != nil { + return nil, err + } + partsByNumber := make(map[int]models.UploadedPart, len(storedParts)) + for _, part := range storedParts { + partsByNumber[part.PartNumber] = part + } + + lastPartNumber := 0 + orderedParts := make([]models.UploadedPart, 0, len(completed)) + chunks := make([]string, 0) + var totalSize int64 + + for _, part := range completed { + if part.PartNumber <= lastPartNumber { + return nil, ErrInvalidPartOrder + } + lastPartNumber = part.PartNumber + + storedPart, ok := partsByNumber[part.PartNumber] + if !ok { + return nil, ErrInvalidPart + } + if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) { + return nil, ErrInvalidPart + } + + orderedParts = append(orderedParts, storedPart) + chunks = append(chunks, storedPart.Chunks...) + totalSize += storedPart.Size + } + + finalETag := buildMultipartETag(orderedParts) + manifest := &models.ObjectManifest{ + Bucket: bucket, + Key: key, + Size: totalSize, + ContentType: "application/octet-stream", + ETag: finalETag, + Chunks: chunks, + CreatedAt: time.Now().Unix(), + } + + if err := s.metadataHandler.CompleteMultipartUpload(uploadID, manifest); err != nil { + return nil, err + } + + return manifest, nil +} + +func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { + upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + if err != nil { + return err + } + if upload.Bucket != bucket || upload.Key != key { + return metadata.ErrMultipartNotFound + } + return s.metadataHandler.AbortMultipartUpload(uploadID) +} + +func normalizeETag(etag string) string { + return strings.Trim(etag, "\"") +} + +func buildMultipartETag(parts []models.UploadedPart) string { + hasher := md5.New() + for _, part := range parts { + etagBytes, err := hex.DecodeString(normalizeETag(part.ETag)) + if err == nil { + _, _ = hasher.Write(etagBytes) + continue + } + _, _ = hasher.Write([]byte(normalizeETag(part.ETag))) + } + return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts)) } From 5d41ec9e0a161061f33a76dd2630324c1a598dbc Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sun, 22 Feb 2026 15:32:04 +0100 Subject: [PATCH 4/8] Implemented bulk delete from bucket, AWS SigV4 framing problems solved. --- api/api.go | 153 +++++++++++++++++++++++++++++++++++++++++-- api/s3_errors.go | 7 ++ main.go | 2 +- metadata/metadata.go | 28 ++++++++ models/models.go | 20 ++++++ service/service.go | 10 ++- 6 files changed, 214 insertions(+), 6 deletions(-) diff --git a/api/api.go b/api/api.go index 42421db..eff2c9d 100644 --- a/api/api.go +++ b/api/api.go @@ -1,6 +1,7 @@ package api import ( + "bufio" "encoding/xml" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "io" "net/http" "strconv" + "strings" "time" "github.com/go-chi/chi/v5" @@ -42,6 +44,8 @@ func (h *Handler) setupRoutes() { h.router.Get("/{bucket}", h.handleGetBucket) h.router.Put("/{bucket}", h.handlePutBucket) h.router.Put("/{bucket}/", h.handlePutBucket) + h.router.Post("/{bucket}", h.handlePostBucket) + h.router.Post("/{bucket}/", h.handlePostBucket) h.router.Delete("/{bucket}", h.handleDeleteBucket) h.router.Delete("/{bucket}/", h.handleDeleteBucket) h.router.Head("/{bucket}", h.handleHeadBucket) @@ -170,6 +174,11 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { } defer r.Body.Close() + bodyReader := io.Reader(r.Body) + if shouldDecodeAWSChunkedPayload(r) { + bodyReader = newAWSChunkedDecodingReader(r.Body) + } + uploadID := r.URL.Query().Get("uploadId") partNumberRaw := r.URL.Query().Get("partNumber") if uploadID != "" || partNumberRaw != "" { @@ -184,7 +193,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { return } - etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, r.Body) + etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader) if err != nil { writeMappedS3Error(w, r, err) return @@ -200,7 +209,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { contentType = "application/octet-stream" } - manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body) + manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader) if err != nil { writeMappedS3Error(w, r, err) @@ -248,6 +257,75 @@ func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Reques _, _ = w.Write(payload) } +func shouldDecodeAWSChunkedPayload(r *http.Request) bool { + contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding")) + if strings.Contains(contentEncoding, "aws-chunked") { + return true + } + signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256")) + return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") +} + +func newAWSChunkedDecodingReader(src io.Reader) io.Reader { + pr, pw := io.Pipe() + go func() { + if err := decodeAWSChunkedPayload(src, pw); err != nil { + _ = pw.CloseWithError(err) + return + } + _ = pw.Close() + }() + return pr +} + +func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error { + reader := bufio.NewReader(src) + for { + headerLine, err := reader.ReadString('\n') + if err != nil { + return err + } + headerLine = strings.TrimRight(headerLine, "\r\n") + chunkSizeToken := headerLine + if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 { + chunkSizeToken = chunkSizeToken[:idx] + } + chunkSizeToken = strings.TrimSpace(chunkSizeToken) + chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64) + if err != nil { + return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err) + } + if chunkSize < 0 { + return fmt.Errorf("invalid aws-chunked size: %d", chunkSize) + } + if chunkSize > 0 { + if _, err := io.CopyN(dst, reader, chunkSize); err != nil { + return err + } + } + + crlf := make([]byte, 2) + if _, err := io.ReadFull(reader, crlf); err != nil { + return err + } + if crlf[0] != '\r' || crlf[1] != '\n' { + return errors.New("invalid aws-chunked payload terminator") + } + + if chunkSize == 0 { + for { + line, err := reader.ReadString('\n') + if err != nil { + return err + } + if line == "\r\n" || line == "\n" { + return nil + } + } + } + } +} + func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -261,9 +339,11 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { writeMappedS3Error(w, r, err) return } + etag := manifest.ETag + size := strconv.Itoa(int(manifest.Size)) - w.Header().Set("ETag", `"`+manifest.ETag+`"`) - w.Header().Set("Content-Length", "0") + w.Header().Set("ETag", `"`+etag+`"`) + w.Header().Set("Content-Length", size) w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat)) w.WriteHeader(http.StatusOK) } @@ -286,6 +366,61 @@ func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { + bucket := chi.URLParam(r, "bucket") + if _, ok := r.URL.Query()["delete"]; !ok { + writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) + return + } + defer r.Body.Close() + + bodyReader := io.Reader(r.Body) + if shouldDecodeAWSChunkedPayload(r) { + bodyReader = newAWSChunkedDecodingReader(r.Body) + } + + var req models.DeleteObjectsRequest + if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil { + writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) + return + } + + keys := make([]string, 0, len(req.Objects)) + for _, obj := range req.Objects { + if obj.Key == "" { + continue + } + keys = append(keys, obj.Key) + } + + deleted, err := h.svc.DeleteObjects(bucket, keys) + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + response := models.DeleteObjectsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + } + if !req.Quiet { + response.Deleted = make([]models.DeletedEntry, 0, len(deleted)) + for _, key := range deleted { + response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key}) + } + } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) +} + func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") key := chi.URLParam(r, "*") @@ -347,6 +482,16 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { h.handleListObjectsV2(w, r, bucket, prefix) return } + if r.URL.Query().Has("location") { + xmlResponse := ` + us-east-1` + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) + w.WriteHeader(http.StatusOK) + w.Write([]byte(xmlResponse)) + return + } writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) } diff --git a/api/s3_errors.go b/api/s3_errors.go index 7d65519..c446e4d 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -41,6 +41,11 @@ var ( Code: "MalformedXML", Message: "The XML you provided was not well-formed or did not validate against our published schema.", } + s3ErrEntityTooSmall = s3APIError{ + Status: http.StatusBadRequest, + Code: "EntityTooSmall", + Message: "Your proposed upload is smaller than the minimum allowed object size.", + } s3ErrInternal = s3APIError{ Status: http.StatusInternalServerError, Code: "InternalError", @@ -98,6 +103,8 @@ func mapToS3Error(err error) s3APIError { return s3ErrInvalidPartOrder case errors.Is(err, service.ErrInvalidCompleteRequest): return s3ErrMalformedXML + case errors.Is(err, service.ErrEntityTooSmall): + return s3ErrEntityTooSmall default: return s3ErrInternal } diff --git a/main.go b/main.go index 74d37c7..39d3e65 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ func main() { objectService := service.NewObjectService(metadataHandler) handler := api.NewHandler(objectService) - err = handler.Start("localhost:3000") + err = handler.Start("0.0.0.0:3000") if err != nil { return } diff --git a/metadata/metadata.go b/metadata/metadata.go index 1365f13..ce346ac 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -289,6 +289,34 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error { } +func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) { + deleted := make([]string, 0, len(keys)) + + err := h.db.Update(func(tx *bbolt.Tx) error { + metadataBucket := tx.Bucket([]byte(bucket)) + if metadataBucket == nil { + return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket) + } + + for _, key := range keys { + if key == "" { + continue + } + if metadataBucket.Get([]byte(key)) != nil { + if err := metadataBucket.Delete([]byte(key)); err != nil { + return err + } + } + deleted = append(deleted, key) + } + return nil + }) + if err != nil { + return nil, err + } + return deleted, nil +} + func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { var upload *models.MultipartUpload diff --git a/models/models.go b/models/models.go index 6cace47..fe6d74f 100644 --- a/models/models.go +++ b/models/models.go @@ -116,3 +116,23 @@ type PartItem struct { ETag string `xml:"ETag"` Size int64 `xml:"Size"` } + +type DeleteObjectsRequest struct { + XMLName xml.Name `xml:"Delete"` + Objects []DeleteObjectIdentity `xml:"Object"` + Quiet bool `xml:"Quiet"` +} + +type DeleteObjectIdentity struct { + Key string `xml:"Key"` +} + +type DeleteObjectsResult struct { + XMLName xml.Name `xml:"DeleteResult"` + Xmlns string `xml:"xmlns,attr"` + Deleted []DeletedEntry `xml:"Deleted,omitempty"` +} + +type DeletedEntry struct { + Key string `xml:"Key"` +} diff --git a/service/service.go b/service/service.go index 9d67e5d..755aea1 100644 --- a/service/service.go +++ b/service/service.go @@ -21,6 +21,7 @@ var ( ErrInvalidPart = errors.New("invalid multipart part") ErrInvalidPartOrder = errors.New("invalid multipart part order") ErrInvalidCompleteRequest = errors.New("invalid complete multipart request") + ErrEntityTooSmall = errors.New("multipart entity too small") ) func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { @@ -108,6 +109,10 @@ func (s *ObjectService) ListBuckets() ([]string, error) { return s.metadataHandler.ListBuckets() } +func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { + return s.metadataHandler.DeleteManifests(bucket, keys) +} + func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { return s.metadataHandler.CreateMultipartUpload(bucket, key) } @@ -182,7 +187,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co chunks := make([]string, 0) var totalSize int64 - for _, part := range completed { + for i, part := range completed { if part.PartNumber <= lastPartNumber { return nil, ErrInvalidPartOrder } @@ -195,6 +200,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) { return nil, ErrInvalidPart } + if i < len(completed)-1 && storedPart.Size < 5*1024*1024 { + return nil, ErrEntityTooSmall + } orderedParts = append(orderedParts, storedPart) chunks = append(chunks, storedPart.Chunks...) From c989037160affa61dd612abd0348a26160cd09b8 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sun, 22 Feb 2026 23:00:33 +0100 Subject: [PATCH 5/8] Finialized multipart upload and graceful shutdown. Added Dockerfile. --- .dockerignore | 3 +++ Dockerfile | 16 +++++++++++ README.md | 33 ++++++++++++++++++++++- api/api.go | 63 +++++++++++++++++++++++++++++++++++++++----- metadata/metadata.go | 4 +++ service/service.go | 4 +++ 6 files changed, 115 insertions(+), 8 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..ad0a5f5 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +*.md +.gocache/ +blobs/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..843bf24 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM golang:1.25-alpine AS build + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build -o /app/fs . + +FROM scratch AS runner + +COPY --from=build /app/fs /app/fs + +WORKDIR /app +CMD ["/app/fs"] diff --git a/README.md b/README.md index cca2a29..6512bc1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,34 @@ # fs -An experimental Object Storage written in Go that should be compatible with S3 \ No newline at end of file +An experimental Object Storage written in Go that should be partially compatible with S3 + +## Features + +- Bucket operations: +- `PUT /{bucket}` +- `HEAD /{bucket}` +- `DELETE /{bucket}` +- `GET /` (list buckets) +- Object operations: +- `PUT /{bucket}/{key}` +- `GET /{bucket}/{key}` +- `HEAD /{bucket}/{key}` +- `DELETE /{bucket}/{key}` +- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style) +- Multipart upload: +- `POST /{bucket}/{key}?uploads` (initiate) +- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part) +- `GET /{bucket}/{key}?uploadId=...` (list parts) +- `POST /{bucket}/{key}?uploadId=...` (complete) +- `DELETE /{bucket}/{key}?uploadId=...` (abort) +- Multi-object delete: +- `POST /{bucket}?delete` with S3-style XML body +- AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies) + +## Limitations + +- No authentication/authorization yet. +- Not full S3 API coverage. +- No garbage collection of unreferenced blob chunks yet. +- No versioning or lifecycle policies. +- Error and edge-case behavior is still being refined for client compatibility. \ No newline at end of file diff --git a/api/api.go b/api/api.go index eff2c9d..e8b2c8c 100644 --- a/api/api.go +++ b/api/api.go @@ -2,6 +2,7 @@ package api import ( "bufio" + "context" "encoding/xml" "errors" "fmt" @@ -10,9 +11,13 @@ import ( "fs/service" "fs/utils" "io" + "log" "net/http" + "os" + "os/signal" "strconv" "strings" + "syscall" "time" "github.com/go-chi/chi/v5" @@ -58,7 +63,7 @@ func (h *Handler) setupRoutes() { h.router.Delete("/{bucket}/*", h.handleDeleteObject) } -func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) { +func (h *Handler) handleWelcome(w http.ResponseWriter) { w.WriteHeader(http.StatusOK) _, err := w.Write([]byte("Welcome to the Object Storage API!")) if err != nil { @@ -103,7 +108,12 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - defer r.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + + } + }(r.Body) if _, ok := r.URL.Query()["uploads"]; ok { upload, err := h.svc.CreateMultipartUpload(bucket, key) @@ -172,7 +182,12 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - defer r.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + + } + }(r.Body) bodyReader := io.Reader(r.Body) if shouldDecodeAWSChunkedPayload(r) { @@ -372,7 +387,12 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) return } - defer r.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + + } + }(r.Body) bodyReader := io.Reader(r.Body) if shouldDecodeAWSChunkedPayload(r) { @@ -467,7 +487,10 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/xml") w.WriteHeader(http.StatusOK) for _, bucket := range buckets { - w.Write([]byte(bucket)) + _, err := w.Write([]byte(bucket)) + if err != nil { + return + } } } @@ -489,7 +512,10 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) w.WriteHeader(http.StatusOK) - w.Write([]byte(xmlResponse)) + _, err := w.Write([]byte(xmlResponse)) + if err != nil { + return + } return } writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) @@ -522,5 +548,28 @@ func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bu func (h *Handler) Start(address string) error { fmt.Printf("Starting API server on %s\n", address) h.setupRoutes() - return http.ListenAndServe(address, h.router) + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + server := http.Server{ + Addr: address, + Handler: h.router, + } + + go func() { + if err := server.ListenAndServe(); err != nil { + log.Fatal(err) + } + }() + <-stop + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + return err + } + if err := h.svc.Close(); err != nil { + return err + } + + return nil } diff --git a/metadata/metadata.go b/metadata/metadata.go index ce346ac..d6b1719 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -70,6 +70,10 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { return h, nil } +func (h *MetadataHandler) Close() error { + return h.db.Close() +} + func (h *MetadataHandler) CreateBucket(bucketName string) error { if !validBucketName.MatchString(bucketName) { return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName) diff --git a/service/service.go b/service/service.go index 755aea1..26eee1b 100644 --- a/service/service.go +++ b/service/service.go @@ -254,3 +254,7 @@ func buildMultipartETag(parts []models.UploadedPart) string { } return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts)) } + +func (s *ObjectService) Close() error { + return s.metadataHandler.Close() +} From d7bdb3177bb242a44a9aff015f5f41061c42ac80 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Mon, 23 Feb 2026 00:42:38 +0100 Subject: [PATCH 6/8] Added logging --- api/api.go | 54 ++++++++++++--- go.mod | 11 ++- go.sum | 16 +++-- logging/logging.go | 162 +++++++++++++++++++++++++++++++++++++++++++ main.go | 17 +++-- metadata/metadata.go | 2 +- service/service.go | 9 ++- 7 files changed, 242 insertions(+), 29 deletions(-) create mode 100644 logging/logging.go diff --git a/api/api.go b/api/api.go index e8b2c8c..3bdbd51 100644 --- a/api/api.go +++ b/api/api.go @@ -6,12 +6,13 @@ import ( "encoding/xml" "errors" "fmt" + "fs/logging" "fs/metadata" "fs/models" "fs/service" "fs/utils" "io" - "log" + "log/slog" "net/http" "os" "os/signal" @@ -25,23 +26,36 @@ import ( ) type Handler struct { - router *chi.Mux - svc *service.ObjectService + router *chi.Mux + svc *service.ObjectService + logger *slog.Logger + logConfig logging.Config } -func NewHandler(svc *service.ObjectService) *Handler { +func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig logging.Config) *Handler { r := chi.NewRouter() r.Use(middleware.Recoverer) + if logger == nil { + logger = slog.Default() + } h := &Handler{ - router: r, - svc: svc, + router: r, + svc: svc, + logger: logger, + logConfig: logConfig, } return h } func (h *Handler) setupRoutes() { - h.router.Use(middleware.Logger) + if h.logConfig.Format == "text" { + if h.logConfig.Audit || h.logConfig.DebugMode { + h.router.Use(middleware.Logger) + } + } else { + h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig)) + } h.router.Get("/", h.handleGetBuckets) @@ -546,30 +560,50 @@ func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bu } func (h *Handler) Start(address string) error { - fmt.Printf("Starting API server on %s\n", address) + h.logger.Info("server_starting", + "address", address, + "log_format", h.logConfig.Format, + "log_level", h.logConfig.LevelName, + "audit_log", h.logConfig.Audit, + ) h.setupRoutes() stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(stop) server := http.Server{ Addr: address, Handler: h.router, } + errCh := make(chan error, 1) go func() { if err := server.ListenAndServe(); err != nil { - log.Fatal(err) + if !errors.Is(err, http.ErrServerClosed) { + errCh <- err + } } }() - <-stop + + select { + case <-stop: + h.logger.Info("shutdown_signal_received") + case err := <-errCh: + h.logger.Error("server_listen_failed", "error", err) + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { + h.logger.Error("server_shutdown_failed", "error", err) return err } if err := h.svc.Close(); err != nil { + h.logger.Error("service_close_failed", "error", err) return err } + h.logger.Info("server_stopped") return nil } diff --git a/go.mod b/go.mod index 8038354..097ed4b 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,9 @@ module fs go 1.25.7 require ( - github.com/go-chi/chi/v5 v5.2.5 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/cpuid/v2 v2.3.0 // indirect - github.com/klauspost/reedsolomon v1.13.2 // indirect - go.etcd.io/bbolt v1.4.3 // indirect - golang.org/x/sys v0.41.0 // indirect + github.com/go-chi/chi/v5 v5.2.5 + github.com/google/uuid v1.6.0 + go.etcd.io/bbolt v1.4.3 ) + +require golang.org/x/sys v0.41.0 // indirect diff --git a/go.sum b/go.sum index f2ff379..a3b895f 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,18 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= -github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= -github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE= -github.com/klauspost/reedsolomon v1.13.2/go.mod h1:ggJT9lc71Vu+cSOPBlxGvBN6TfAS77qB4fp8vJ05NSA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..0562c0f --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,162 @@ +package logging + +import ( + "log/slog" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +type Config struct { + Level slog.Level + LevelName string + Format string + Audit bool + AddSource bool + DebugMode bool +} + +func ConfigFromEnv() Config { + levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL"))) + if levelName == "" { + levelName = "info" + } + level := parseLevel(levelName) + levelName = level.String() + + format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT"))) + if format == "" { + format = "text" + } + if format != "json" && format != "text" { + format = "text" + } + + debugMode := level <= slog.LevelDebug + return Config{ + Level: level, + LevelName: levelName, + Format: format, + Audit: envBool("AUDIT_LOG", true), + AddSource: debugMode, + DebugMode: debugMode, + } +} + +func NewLogger(cfg Config) *slog.Logger { + opts := &slog.HandlerOptions{ + Level: cfg.Level, + AddSource: cfg.AddSource, + } + opts.ReplaceAttr = func(_ []string, attr slog.Attr) slog.Attr { + if attr.Key == slog.SourceKey { + if src, ok := attr.Value.Any().(*slog.Source); ok && src != nil { + attr.Key = "src" + attr.Value = slog.StringValue(filepath.Base(src.File) + ":" + strconv.Itoa(src.Line)) + } + } + return attr + } + + var handler slog.Handler + if cfg.Format == "json" { + handler = slog.NewJSONHandler(os.Stdout, opts) + } else { + handler = slog.NewTextHandler(os.Stdout, opts) + } + + logger := slog.New(handler) + slog.SetDefault(logger) + return logger +} + +func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + ww := &responseWriter{ResponseWriter: w, status: http.StatusOK} + + next.ServeHTTP(ww, r) + + if !cfg.Audit && !cfg.DebugMode { + return + } + + elapsed := time.Since(start) + attrs := []any{ + "method", r.Method, + "path", r.URL.Path, + "status", ww.status, + "bytes", ww.bytes, + "remote_addr", r.RemoteAddr, + } + switch { + case elapsed < time.Microsecond: + attrs = append(attrs, "duration_ns", elapsed.Nanoseconds()) + case elapsed < time.Millisecond: + attrs = append(attrs, "duration_us", elapsed.Microseconds()) + default: + attrs = append(attrs, "duration_ms", elapsed.Milliseconds()) + } + + if cfg.DebugMode { + attrs = append(attrs, + "query", r.URL.RawQuery, + "user_agent", r.UserAgent(), + "content_length", r.ContentLength, + "content_type", r.Header.Get("Content-Type"), + "x_amz_sha256", r.Header.Get("x-amz-content-sha256"), + ) + logger.Debug("http_request", attrs...) + return + } + + logger.Info("http_request", attrs...) + }) + } +} + +type responseWriter struct { + http.ResponseWriter + status int + bytes int +} + +func (w *responseWriter) WriteHeader(statusCode int) { + w.status = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *responseWriter) Write(p []byte) (int, error) { + n, err := w.ResponseWriter.Write(p) + w.bytes += n + return n, err +} + +func envBool(key string, defaultValue bool) bool { + raw := os.Getenv(key) + if raw == "" { + return defaultValue + } + value, err := strconv.ParseBool(raw) + if err != nil { + return defaultValue + } + return value +} + +func parseLevel(levelName string) slog.Level { + switch levelName { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/main.go b/main.go index 39d3e65..c11ddaf 100644 --- a/main.go +++ b/main.go @@ -1,24 +1,31 @@ package main import ( - "fmt" "fs/api" + "fs/logging" "fs/metadata" "fs/service" ) func main() { + logConfig := logging.ConfigFromEnv() + logger := logging.NewLogger(logConfig) + logger.Info("boot", + "log_level", logConfig.LevelName, + "log_format", logConfig.Format, + "audit_log", logConfig.Audit, + ) metadataHandler, err := metadata.NewMetadataHandler("metadata.db") if err != nil { - fmt.Printf("Error initializing metadata handler: %v\n", err) + logger.Error("failed_to_initialize_metadata_handler", "error", err) return } objectService := service.NewObjectService(metadataHandler) - handler := api.NewHandler(objectService) - err = handler.Start("0.0.0.0:3000") - if err != nil { + handler := api.NewHandler(objectService, logger, logConfig) + if err = handler.Start("0.0.0.0:3000"); err != nil { + logger.Error("server_stopped_with_error", "error", err) return } } diff --git a/metadata/metadata.go b/metadata/metadata.go index d6b1719..6a2df20 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -36,7 +36,7 @@ var ( ) func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { - db, err := bbolt.Open(dbPath, 0600, nil) + db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 2 * time.Second}) if err != nil { return nil, err } diff --git a/service/service.go b/service/service.go index 26eee1b..aa5c6d6 100644 --- a/service/service.go +++ b/service/service.go @@ -9,6 +9,7 @@ import ( "fs/models" "fs/storage" "io" + "log/slog" "strings" "time" ) @@ -45,7 +46,13 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read Chunks: chunks, CreatedAt: timestamp, } - fmt.Println(manifest) + slog.Debug("object_written_manifest", + "bucket", manifest.Bucket, + "key", manifest.Key, + "size", manifest.Size, + "chunk_count", len(manifest.Chunks), + "etag", manifest.ETag, + ) if err = s.metadataHandler.PutManifest(manifest); err != nil { return nil, err } From a8204de914492e0e27ea61d4fb0cef8f9bdad76e Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Mon, 23 Feb 2026 21:52:45 +0100 Subject: [PATCH 7/8] Fixed logging, added config and .env example --- .dockerignore | 3 +- .env.example | 6 ++++ .gitignore | 6 ++-- api/api.go | 8 +---- go.mod | 5 ++- go.sum | 2 ++ logging/logging.go | 21 ++++++----- main.go | 27 ++++++++++++--- service/service.go | 55 ++++++++++++++--------------- storage/blob.go | 31 +++++++++++------ utils/config.go | 86 ++++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 187 insertions(+), 63 deletions(-) create mode 100644 .env.example create mode 100644 utils/config.go diff --git a/.dockerignore b/.dockerignore index ad0a5f5..08dc479 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,4 @@ *.md .gocache/ -blobs/ \ No newline at end of file +blobs/ +data/ \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e6c7567 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +LOG_LEVEL=debug +LOG_FORMAT=text +DATA_PATH=data/ +PORT=2600 +AUDIT_LOG=true +ADDRESS=0.0.0.0 diff --git a/.gitignore b/.gitignore index 68f407a..445f71b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ .env +*.db .vscode/ blobs/ -*.db +data/ .idea/ -.gocache/ \ No newline at end of file +.gocache/ +.gomodcache/ diff --git a/api/api.go b/api/api.go index 3bdbd51..3de2450 100644 --- a/api/api.go +++ b/api/api.go @@ -49,13 +49,7 @@ func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig loggi } func (h *Handler) setupRoutes() { - if h.logConfig.Format == "text" { - if h.logConfig.Audit || h.logConfig.DebugMode { - h.router.Use(middleware.Logger) - } - } else { - h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig)) - } + h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig)) h.router.Get("/", h.handleGetBuckets) diff --git a/go.mod b/go.mod index 097ed4b..2b9a16e 100644 --- a/go.mod +++ b/go.mod @@ -8,4 +8,7 @@ require ( go.etcd.io/bbolt v1.4.3 ) -require golang.org/x/sys v0.41.0 // indirect +require ( + github.com/joho/godotenv v1.5.1 // indirect + golang.org/x/sys v0.41.0 // indirect +) diff --git a/go.sum b/go.sum index a3b895f..eb8ecee 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= diff --git a/logging/logging.go b/logging/logging.go index 0562c0f..1edadf2 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -21,13 +21,19 @@ type Config struct { func ConfigFromEnv() Config { levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL"))) + format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT"))) + return ConfigFromValues(levelName, format, envBool("AUDIT_LOG", true)) +} + +func ConfigFromValues(levelName, format string, audit bool) Config { + levelName = strings.ToLower(strings.TrimSpace(levelName)) if levelName == "" { levelName = "info" } level := parseLevel(levelName) - levelName = level.String() + levelName = strings.ToUpper(level.String()) - format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT"))) + format = strings.ToLower(strings.TrimSpace(format)) if format == "" { format = "text" } @@ -40,7 +46,7 @@ func ConfigFromEnv() Config { Level: level, LevelName: levelName, Format: format, - Audit: envBool("AUDIT_LOG", true), + Audit: audit, AddSource: debugMode, DebugMode: debugMode, } @@ -91,16 +97,9 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han "path", r.URL.Path, "status", ww.status, "bytes", ww.bytes, + "duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0, "remote_addr", r.RemoteAddr, } - switch { - case elapsed < time.Microsecond: - attrs = append(attrs, "duration_ns", elapsed.Nanoseconds()) - case elapsed < time.Millisecond: - attrs = append(attrs, "duration_us", elapsed.Microseconds()) - default: - attrs = append(attrs, "duration_ms", elapsed.Milliseconds()) - } if cfg.DebugMode { attrs = append(attrs, diff --git a/main.go b/main.go index c11ddaf..251fd00 100644 --- a/main.go +++ b/main.go @@ -5,26 +5,45 @@ import ( "fs/logging" "fs/metadata" "fs/service" + "fs/storage" + "fs/utils" + "os" + "path/filepath" + "strconv" ) func main() { - logConfig := logging.ConfigFromEnv() + config := utils.NewConfig() + logConfig := logging.ConfigFromValues(config.LogLevel, config.LogFormat, config.AuditLog) logger := logging.NewLogger(logConfig) logger.Info("boot", "log_level", logConfig.LevelName, "log_format", logConfig.Format, "audit_log", logConfig.Audit, + "data_path", config.DataPath, ) - metadataHandler, err := metadata.NewMetadataHandler("metadata.db") + if err := os.MkdirAll(config.DataPath, 0o755); err != nil { + logger.Error("failed_to_prepare_data_path", "path", config.DataPath, "error", err) + return + } + + dbPath := filepath.Join(config.DataPath, "metadata.db") + metadataHandler, err := metadata.NewMetadataHandler(dbPath) if err != nil { logger.Error("failed_to_initialize_metadata_handler", "error", err) return } + blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize) + if err != nil { + logger.Error("failed_to_initialize_blob_store", "error", err) + return + } - objectService := service.NewObjectService(metadataHandler) + objectService := service.NewObjectService(metadataHandler, blobHandler) handler := api.NewHandler(objectService, logger, logConfig) - if err = handler.Start("0.0.0.0:3000"); err != nil { + addr := config.Address + ":" + strconv.Itoa(config.Port) + if err = handler.Start(addr); err != nil { logger.Error("server_stopped_with_error", "error", err) return } diff --git a/service/service.go b/service/service.go index aa5c6d6..560816f 100644 --- a/service/service.go +++ b/service/service.go @@ -15,7 +15,8 @@ import ( ) type ObjectService struct { - metadataHandler *metadata.MetadataHandler + metadata *metadata.MetadataHandler + blob *storage.BlobStore } var ( @@ -25,13 +26,13 @@ var ( ErrEntityTooSmall = errors.New("multipart entity too small") ) -func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { - return &ObjectService{metadataHandler: metadataHandler} +func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore) *ObjectService { + return &ObjectService{metadata: metadataHandler, blob: blobHandler} } func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) { - chunks, size, etag, err := storage.IngestStream(input) + chunks, size, etag, err := s.blob.IngestStream(input) if err != nil { return nil, err } @@ -53,7 +54,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read "chunk_count", len(manifest.Chunks), "etag", manifest.ETag, ) - if err = s.metadataHandler.PutManifest(manifest); err != nil { + if err = s.metadata.PutManifest(manifest); err != nil { return nil, err } @@ -61,7 +62,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read } func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { - manifest, err := s.metadataHandler.GetManifest(bucket, key) + manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { return nil, nil, err } @@ -75,7 +76,7 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob } }(pw) - err := storage.AssembleStream(manifest.Chunks, pw) + err := s.blob.AssembleStream(manifest.Chunks, pw) if err != nil { return } @@ -84,7 +85,7 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob } func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) { - manifest, err := s.metadataHandler.GetManifest(bucket, key) + manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { return models.ObjectManifest{}, err } @@ -92,36 +93,36 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e } func (s *ObjectService) DeleteObject(bucket, key string) error { - return s.metadataHandler.DeleteManifest(bucket, key) + return s.metadata.DeleteManifest(bucket, key) } func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) { - return s.metadataHandler.ListObjects(bucket, prefix) + return s.metadata.ListObjects(bucket, prefix) } func (s *ObjectService) CreateBucket(bucket string) error { - return s.metadataHandler.CreateBucket(bucket) + return s.metadata.CreateBucket(bucket) } func (s *ObjectService) HeadBucket(bucket string) error { - _, err := s.metadataHandler.GetBucketManifest(bucket) + _, err := s.metadata.GetBucketManifest(bucket) return err } func (s *ObjectService) DeleteBucket(bucket string) error { - return s.metadataHandler.DeleteBucket(bucket) + return s.metadata.DeleteBucket(bucket) } func (s *ObjectService) ListBuckets() ([]string, error) { - return s.metadataHandler.ListBuckets() + return s.metadata.ListBuckets() } func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { - return s.metadataHandler.DeleteManifests(bucket, keys) + return s.metadata.DeleteManifests(bucket, keys) } func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { - return s.metadataHandler.CreateMultipartUpload(bucket, key) + return s.metadata.CreateMultipartUpload(bucket, key) } func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { @@ -129,7 +130,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, return "", ErrInvalidPart } - upload, err := s.metadataHandler.GetMultipartUpload(uploadId) + upload, err := s.metadata.GetMultipartUpload(uploadId) if err != nil { return "", err } @@ -138,7 +139,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, } var uploadedPart models.UploadedPart - chunkIds, totalSize, etag, err := storage.IngestStream(input) + chunkIds, totalSize, etag, err := s.blob.IngestStream(input) if err != nil { return "", err } @@ -149,7 +150,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, Chunks: chunkIds, CreatedAt: time.Now().Unix(), } - err = s.metadataHandler.PutMultipartPart(uploadId, uploadedPart) + err = s.metadata.PutMultipartPart(uploadId, uploadedPart) if err != nil { return "", err } @@ -157,14 +158,14 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, } func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { - upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { return nil, err } if upload.Bucket != bucket || upload.Key != key { return nil, metadata.ErrMultipartNotFound } - return s.metadataHandler.ListMultipartParts(uploadID) + return s.metadata.ListMultipartParts(uploadID) } func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { @@ -172,7 +173,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co return nil, ErrInvalidCompleteRequest } - upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { return nil, err } @@ -180,7 +181,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co return nil, metadata.ErrMultipartNotFound } - storedParts, err := s.metadataHandler.ListMultipartParts(uploadID) + storedParts, err := s.metadata.ListMultipartParts(uploadID) if err != nil { return nil, err } @@ -227,7 +228,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co CreatedAt: time.Now().Unix(), } - if err := s.metadataHandler.CompleteMultipartUpload(uploadID, manifest); err != nil { + if err := s.metadata.CompleteMultipartUpload(uploadID, manifest); err != nil { return nil, err } @@ -235,14 +236,14 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co } func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { - upload, err := s.metadataHandler.GetMultipartUpload(uploadID) + upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { return err } if upload.Bucket != bucket || upload.Key != key { return metadata.ErrMultipartNotFound } - return s.metadataHandler.AbortMultipartUpload(uploadID) + return s.metadata.AbortMultipartUpload(uploadID) } func normalizeETag(etag string) string { @@ -263,5 +264,5 @@ func buildMultipartETag(parts []models.UploadedPart) string { } func (s *ObjectService) Close() error { - return s.metadataHandler.Close() + return s.metadata.Close() } diff --git a/storage/blob.go b/storage/blob.go index 268f762..4fae764 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -13,10 +13,22 @@ import ( const chunkSize = 64 * 1024 const blobRoot = "blobs/" -func IngestStream(stream io.Reader) ([]string, int64, string, error) { +type BlobStore struct { + dataRoot string + chunkSize int +} + +func NewBlobStore(root string, chunkSize int) (*BlobStore, error) { + if err := os.MkdirAll(filepath.Join(root, blobRoot), 0o755); err != nil { + return nil, err + } + return &BlobStore{chunkSize: chunkSize, dataRoot: root}, nil +} + +func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) { fullFileHasher := md5.New() - buffer := make([]byte, chunkSize) + buffer := make([]byte, bs.chunkSize) var totalSize int64 var chunkIDs []string @@ -35,7 +47,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) { chunkHash := sha256.Sum256(chunkData) chunkID := hex.EncodeToString(chunkHash[:]) - err := saveBlob(chunkID, chunkData) + err := bs.saveBlob(chunkID, chunkData) if err != nil { return nil, 0, "", err } @@ -54,8 +66,8 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) { return chunkIDs, totalSize, etag, nil } -func saveBlob(chunkID string, data []byte) error { - dir := filepath.Join(blobRoot, chunkID[:2], chunkID[2:4]) +func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { + dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4]) if err := os.MkdirAll(dir, 0755); err != nil { return err } @@ -69,9 +81,9 @@ func saveBlob(chunkID string, data []byte) error { return nil } -func AssembleStream(chunkIDs []string, w *io.PipeWriter) error { +func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error { for _, chunkID := range chunkIDs { - chunkData, err := GetBlob(chunkID) + chunkData, err := bs.GetBlob(chunkID) if err != nil { return err } @@ -82,7 +94,6 @@ func AssembleStream(chunkIDs []string, w *io.PipeWriter) error { return nil } -func GetBlob(chunkID string) ([]byte, error) { - - return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID)) +func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) { + return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) } diff --git a/utils/config.go b/utils/config.go new file mode 100644 index 0000000..f9f773c --- /dev/null +++ b/utils/config.go @@ -0,0 +1,86 @@ +package utils + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/joho/godotenv" +) + +type Config struct { + DataPath string + Address string + Port int + ChunkSize int + LogLevel string + LogFormat string + AuditLog bool +} + +func NewConfig() *Config { + _ = godotenv.Load() + + config := &Config{ + DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")), + Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"), + Port: envInt("PORT", 3000), + ChunkSize: envInt("CHUNK_SIZE", 8192000), + LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), + LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), + AuditLog: envBool("AUDIT_LOG", true), + } + + if config.LogFormat != "json" && config.LogFormat != "text" { + config.LogFormat = "text" + } + + return config + +} + +func envInt(key string, defaultValue int) int { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return defaultValue + } + value, err := strconv.Atoi(raw) + if err != nil { + return defaultValue + } + return value +} + +func envBool(key string, defaultValue bool) bool { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return defaultValue + } + value, err := strconv.ParseBool(raw) + if err != nil { + return defaultValue + } + return value +} + +func firstNonEmpty(values ...string) string { + for _, v := range values { + if v != "" { + return v + } + } + return "" +} + +func sanitizeDataPath(raw string) string { + cleaned := strings.TrimSpace(raw) + if cleaned == "" { + cleaned = "." + } + cleaned = filepath.Clean(cleaned) + if abs, err := filepath.Abs(cleaned); err == nil { + return abs + } + return cleaned +} From d9a1bd9001e93c1dbb39f303a45a55ceb881f301 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Mon, 23 Feb 2026 22:35:42 +0100 Subject: [PATCH 8/8] Applied Copilot review suggestions --- api/api.go | 56 +++++++++++++++++++++++++--------------------- logging/logging.go | 29 ++++++++---------------- main.go | 1 + storage/blob.go | 37 ++++++++++++++++++++++++++---- utils/config.go | 9 +++++--- 5 files changed, 79 insertions(+), 53 deletions(-) diff --git a/api/api.go b/api/api.go index 3de2450..ee70fc7 100644 --- a/api/api.go +++ b/api/api.go @@ -98,6 +98,7 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { writeMappedS3Error(w, r, err) return } + defer stream.Close() w.Header().Set("Content-Type", manifest.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10)) @@ -116,12 +117,7 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - - } - }(r.Body) + defer r.Body.Close() if _, ok := r.URL.Query()["uploads"]; ok { upload, err := h.svc.CreateMultipartUpload(bucket, key) @@ -190,17 +186,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path) return } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - - } - }(r.Body) - - bodyReader := io.Reader(r.Body) - if shouldDecodeAWSChunkedPayload(r) { - bodyReader = newAWSChunkedDecodingReader(r.Body) - } + defer r.Body.Close() uploadID := r.URL.Query().Get("uploadId") partNumberRaw := r.URL.Query().Get("partNumber") @@ -215,6 +201,18 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) return } + if partNumber < 1 || partNumber > 10000 { + writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path) + return + } + + bodyReader := io.Reader(r.Body) + var decodeStream io.ReadCloser + if shouldDecodeAWSChunkedPayload(r) { + decodeStream = newAWSChunkedDecodingReader(r.Body) + defer decodeStream.Close() + bodyReader = decodeStream + } etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader) if err != nil { @@ -232,6 +230,14 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { contentType = "application/octet-stream" } + bodyReader := io.Reader(r.Body) + var decodeStream io.ReadCloser + if shouldDecodeAWSChunkedPayload(r) { + decodeStream = newAWSChunkedDecodingReader(r.Body) + defer decodeStream.Close() + bodyReader = decodeStream + } + manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader) if err != nil { @@ -289,7 +295,7 @@ func shouldDecodeAWSChunkedPayload(r *http.Request) bool { return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") } -func newAWSChunkedDecodingReader(src io.Reader) io.Reader { +func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser { pr, pw := io.Pipe() go func() { if err := decodeAWSChunkedPayload(src, pw); err != nil { @@ -363,7 +369,7 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) { return } etag := manifest.ETag - size := strconv.Itoa(int(manifest.Size)) + size := strconv.FormatInt(manifest.Size, 10) w.Header().Set("ETag", `"`+etag+`"`) w.Header().Set("Content-Length", size) @@ -395,16 +401,14 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path) return } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - - } - }(r.Body) + defer r.Body.Close() bodyReader := io.Reader(r.Body) + var decodeStream io.ReadCloser if shouldDecodeAWSChunkedPayload(r) { - bodyReader = newAWSChunkedDecodingReader(r.Body) + decodeStream = newAWSChunkedDecodingReader(r.Body) + defer decodeStream.Close() + bodyReader = decodeStream } var req models.DeleteObjectsRequest diff --git a/logging/logging.go b/logging/logging.go index 1edadf2..7290bf2 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -8,6 +8,8 @@ import ( "strconv" "strings" "time" + + "github.com/go-chi/chi/v5/middleware" ) type Config struct { @@ -83,7 +85,7 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() - ww := &responseWriter{ResponseWriter: w, status: http.StatusOK} + ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) next.ServeHTTP(ww, r) @@ -92,11 +94,15 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han } elapsed := time.Since(start) + status := ww.Status() + if status == 0 { + status = http.StatusOK + } attrs := []any{ "method", r.Method, "path", r.URL.Path, - "status", ww.status, - "bytes", ww.bytes, + "status", status, + "bytes", ww.BytesWritten(), "duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0, "remote_addr", r.RemoteAddr, } @@ -118,23 +124,6 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han } } -type responseWriter struct { - http.ResponseWriter - status int - bytes int -} - -func (w *responseWriter) WriteHeader(statusCode int) { - w.status = statusCode - w.ResponseWriter.WriteHeader(statusCode) -} - -func (w *responseWriter) Write(p []byte) (int, error) { - n, err := w.ResponseWriter.Write(p) - w.bytes += n - return n, err -} - func envBool(key string, defaultValue bool) bool { raw := os.Getenv(key) if raw == "" { diff --git a/main.go b/main.go index 251fd00..53a28ed 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,7 @@ func main() { } blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize) if err != nil { + _ = metadataHandler.Close() logger.Error("failed_to_initialize_blob_store", "error", err) return } diff --git a/storage/blob.go b/storage/blob.go index 4fae764..23d21f6 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -5,13 +5,15 @@ import ( "crypto/sha256" "encoding/hex" "errors" + "fmt" "io" "os" "path/filepath" + "strings" ) -const chunkSize = 64 * 1024 -const blobRoot = "blobs/" +const blobRoot = "blobs" +const maxChunkSize = 64 * 1024 * 1024 type BlobStore struct { dataRoot string @@ -19,10 +21,19 @@ type BlobStore struct { } func NewBlobStore(root string, chunkSize int) (*BlobStore, error) { - if err := os.MkdirAll(filepath.Join(root, blobRoot), 0o755); err != nil { + root = strings.TrimSpace(root) + if root == "" { + return nil, errors.New("blob root is required") + } + if chunkSize <= 0 || chunkSize > maxChunkSize { + return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize) + } + + cleanRoot := filepath.Clean(root) + if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil { return nil, err } - return &BlobStore{chunkSize: chunkSize, dataRoot: root}, nil + return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil } func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) { @@ -67,6 +78,9 @@ func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, er } func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { + if !isValidChunkID(chunkID) { + return fmt.Errorf("invalid chunk id: %q", chunkID) + } dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4]) if err := os.MkdirAll(dir, 0755); err != nil { return err @@ -95,5 +109,20 @@ func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error { } func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) { + if !isValidChunkID(chunkID) { + return nil, fmt.Errorf("invalid chunk id: %q", chunkID) + } return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) } + +func isValidChunkID(chunkID string) bool { + if len(chunkID) != sha256.Size*2 { + return false + } + for _, ch := range chunkID { + if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') { + return false + } + } + return true +} diff --git a/utils/config.go b/utils/config.go index f9f773c..a73e195 100644 --- a/utils/config.go +++ b/utils/config.go @@ -25,8 +25,8 @@ func NewConfig() *Config { config := &Config{ DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")), Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"), - Port: envInt("PORT", 3000), - ChunkSize: envInt("CHUNK_SIZE", 8192000), + Port: envIntRange("PORT", 3000, 1, 65535), + ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024), LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), AuditLog: envBool("AUDIT_LOG", true), @@ -40,7 +40,7 @@ func NewConfig() *Config { } -func envInt(key string, defaultValue int) int { +func envIntRange(key string, defaultValue, minValue, maxValue int) int { raw := strings.TrimSpace(os.Getenv(key)) if raw == "" { return defaultValue @@ -49,6 +49,9 @@ func envInt(key string, defaultValue int) int { if err != nil { return defaultValue } + if value < minValue || value > maxValue { + return defaultValue + } return value }