Merge pull request #1 from ferdzo/feature/multipart-upload

Multipart Upload support
This commit is contained in:
2026-02-23 22:37:33 +01:00
committed by GitHub
16 changed files with 1416 additions and 64 deletions

4
.dockerignore Normal file
View File

@@ -0,0 +1,4 @@
*.md
.gocache/
blobs/
data/

6
.env.example Normal file
View File

@@ -0,0 +1,6 @@
LOG_LEVEL=debug
LOG_FORMAT=text
DATA_PATH=data/
PORT=2600
AUDIT_LOG=true
ADDRESS=0.0.0.0

5
.gitignore vendored
View File

@@ -1,5 +1,8 @@
.env
*.db
.vscode/
blobs/
*.db
data/
.idea/
.gocache/
.gomodcache/

16
Dockerfile Normal file
View File

@@ -0,0 +1,16 @@
FROM golang:1.25-alpine AS build
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /app/fs .
FROM scratch AS runner
COPY --from=build /app/fs /app/fs
WORKDIR /app
CMD ["/app/fs"]

View File

@@ -1,3 +1,34 @@
# fs
An experimental Object Storage written in Go that should be compatible with S3
An experimental Object Storage written in Go that should be partially compatible with S3
## Features
- Bucket operations:
- `PUT /{bucket}`
- `HEAD /{bucket}`
- `DELETE /{bucket}`
- `GET /` (list buckets)
- Object operations:
- `PUT /{bucket}/{key}`
- `GET /{bucket}/{key}`
- `HEAD /{bucket}/{key}`
- `DELETE /{bucket}/{key}`
- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style)
- Multipart upload:
- `POST /{bucket}/{key}?uploads` (initiate)
- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part)
- `GET /{bucket}/{key}?uploadId=...` (list parts)
- `POST /{bucket}/{key}?uploadId=...` (complete)
- `DELETE /{bucket}/{key}?uploadId=...` (abort)
- Multi-object delete:
- `POST /{bucket}?delete` with S3-style XML body
- AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies)
## Limitations
- No authentication/authorization yet.
- Not full S3 API coverage.
- No garbage collection of unreferenced blob chunks yet.
- No versioning or lifecycle policies.
- Error and edge-case behavior is still being refined for client compatibility.

View File

@@ -1,14 +1,24 @@
package api
import (
"bufio"
"context"
"encoding/xml"
"errors"
"fmt"
"fs/logging"
"fs/metadata"
"fs/models"
"fs/service"
"fs/utils"
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/go-chi/chi/v5"
@@ -16,23 +26,30 @@ import (
)
type Handler struct {
router *chi.Mux
svc *service.ObjectService
router *chi.Mux
svc *service.ObjectService
logger *slog.Logger
logConfig logging.Config
}
func NewHandler(svc *service.ObjectService) *Handler {
func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig logging.Config) *Handler {
r := chi.NewRouter()
r.Use(middleware.Recoverer)
if logger == nil {
logger = slog.Default()
}
h := &Handler{
router: r,
svc: svc,
router: r,
svc: svc,
logger: logger,
logConfig: logConfig,
}
return h
}
func (h *Handler) setupRoutes() {
h.router.Use(middleware.Logger)
h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig))
h.router.Get("/", h.handleGetBuckets)
@@ -40,6 +57,8 @@ func (h *Handler) setupRoutes() {
h.router.Get("/{bucket}", h.handleGetBucket)
h.router.Put("/{bucket}", h.handlePutBucket)
h.router.Put("/{bucket}/", h.handlePutBucket)
h.router.Post("/{bucket}", h.handlePostBucket)
h.router.Post("/{bucket}/", h.handlePostBucket)
h.router.Delete("/{bucket}", h.handleDeleteBucket)
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
h.router.Head("/{bucket}", h.handleHeadBucket)
@@ -47,11 +66,12 @@ func (h *Handler) setupRoutes() {
h.router.Get("/{bucket}/*", h.handleGetObject)
h.router.Put("/{bucket}/*", h.handlePutObject)
h.router.Post("/{bucket}/*", h.handlePostObject)
h.router.Head("/{bucket}/*", h.handleHeadObject)
h.router.Delete("/{bucket}/*", h.handleDeleteObject)
}
func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) {
func (h *Handler) handleWelcome(w http.ResponseWriter) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("Welcome to the Object Storage API!"))
if err != nil {
@@ -68,8 +88,9 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
return
}
if r.URL.Query().Get("uploadId") != "" {
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
h.handleListMultipartParts(w, r, bucket, key, uploadID)
return
}
stream, manifest, err := h.svc.GetObject(bucket, key)
@@ -77,6 +98,7 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
writeMappedS3Error(w, r, err)
return
}
defer stream.Close()
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
@@ -88,6 +110,75 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
}
func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if key == "" {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
defer r.Body.Close()
if _, ok := r.URL.Query()["uploads"]; ok {
upload, err := h.svc.CreateMultipartUpload(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.InitiateMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: upload.Bucket,
Key: upload.Key,
UploadID: upload.UploadID,
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
return
}
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
var req models.CompleteMultipartUploadRequest
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.CompleteMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucket,
Key: key,
ETag: `"` + manifest.ETag + `"`,
Location: r.URL.Path,
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
@@ -95,14 +186,59 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
defer r.Body.Close()
uploadID := r.URL.Query().Get("uploadId")
partNumberRaw := r.URL.Query().Get("partNumber")
if uploadID != "" || partNumberRaw != "" {
if uploadID == "" || partNumberRaw == "" {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
partNumber, err := strconv.Atoi(partNumberRaw)
if err != nil {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
if partNumber < 1 || partNumber > 10000 {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusOK)
return
}
contentType := r.Header.Get("Content-Type")
if contentType == "" {
contentType = "application/octet-stream"
}
manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body)
defer r.Body.Close()
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
@@ -115,6 +251,110 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) {
parts, err := h.svc.ListMultipartParts(bucket, key, uploadID)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.ListPartsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucket,
Key: key,
UploadID: uploadID,
Parts: make([]models.PartItem, 0, len(parts)),
}
for _, part := range parts {
response.Parts = append(response.Parts, models.PartItem{
PartNumber: part.PartNumber,
LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + part.ETag + `"`,
Size: part.Size,
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
if strings.Contains(contentEncoding, "aws-chunked") {
return true
}
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload")
}
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
if err := decodeAWSChunkedPayload(src, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
}()
return pr
}
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
reader := bufio.NewReader(src)
for {
headerLine, err := reader.ReadString('\n')
if err != nil {
return err
}
headerLine = strings.TrimRight(headerLine, "\r\n")
chunkSizeToken := headerLine
if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 {
chunkSizeToken = chunkSizeToken[:idx]
}
chunkSizeToken = strings.TrimSpace(chunkSizeToken)
chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64)
if err != nil {
return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err)
}
if chunkSize < 0 {
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
}
if chunkSize > 0 {
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
return err
}
}
crlf := make([]byte, 2)
if _, err := io.ReadFull(reader, crlf); err != nil {
return err
}
if crlf[0] != '\r' || crlf[1] != '\n' {
return errors.New("invalid aws-chunked payload terminator")
}
if chunkSize == 0 {
for {
line, err := reader.ReadString('\n')
if err != nil {
return err
}
if line == "\r\n" || line == "\n" {
return nil
}
}
}
}
}
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
@@ -128,9 +368,11 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
writeMappedS3Error(w, r, err)
return
}
etag := manifest.ETag
size := strconv.FormatInt(manifest.Size, 10)
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Content-Length", "0")
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", size)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.WriteHeader(http.StatusOK)
}
@@ -153,6 +395,64 @@ func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if _, ok := r.URL.Query()["delete"]; !ok {
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
return
}
defer r.Body.Close()
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
var req models.DeleteObjectsRequest
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
keys := make([]string, 0, len(req.Objects))
for _, obj := range req.Objects {
if obj.Key == "" {
continue
}
keys = append(keys, obj.Key)
}
deleted, err := h.svc.DeleteObjects(bucket, keys)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
if !req.Quiet {
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
for _, key := range deleted {
response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key})
}
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
@@ -160,7 +460,15 @@ func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" {
err := h.svc.AbortMultipartUpload(bucket, key, uploadId)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
return
}
err := h.svc.DeleteObject(bucket, key)
if err != nil {
if errors.Is(err, metadata.ErrObjectNotFound) {
@@ -191,7 +499,10 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/xml")
w.WriteHeader(http.StatusOK)
for _, bucket := range buckets {
w.Write([]byte(bucket))
_, err := w.Write([]byte(bucket))
if err != nil {
return
}
}
}
@@ -206,6 +517,19 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
h.handleListObjectsV2(w, r, bucket, prefix)
return
}
if r.URL.Query().Has("location") {
xmlResponse := `<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">us-east-1</LocationConstraint>`
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(xmlResponse))
if err != nil {
return
}
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}
@@ -234,7 +558,50 @@ func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bu
}
func (h *Handler) Start(address string) error {
fmt.Printf("Starting API server on %s\n", address)
h.logger.Info("server_starting",
"address", address,
"log_format", h.logConfig.Format,
"log_level", h.logConfig.LevelName,
"audit_log", h.logConfig.Audit,
)
h.setupRoutes()
return http.ListenAndServe(address, h.router)
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(stop)
server := http.Server{
Addr: address,
Handler: h.router,
}
errCh := make(chan error, 1)
go func() {
if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}
}()
select {
case <-stop:
h.logger.Info("shutdown_signal_received")
case err := <-errCh:
h.logger.Error("server_listen_failed", "error", err)
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
h.logger.Error("server_shutdown_failed", "error", err)
return err
}
if err := h.svc.Close(); err != nil {
h.logger.Error("service_close_failed", "error", err)
return err
}
h.logger.Info("server_stopped")
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fs/metadata"
"fs/models"
"fs/service"
"net/http"
)
@@ -25,6 +26,26 @@ var (
Code: "NotImplemented",
Message: "A header you provided implies functionality that is not implemented.",
}
s3ErrInvalidPart = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidPart",
Message: "One or more of the specified parts could not be found.",
}
s3ErrInvalidPartOrder = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidPartOrder",
Message: "The list of parts was not in ascending order.",
}
s3ErrMalformedXML = s3APIError{
Status: http.StatusBadRequest,
Code: "MalformedXML",
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
}
s3ErrEntityTooSmall = s3APIError{
Status: http.StatusBadRequest,
Code: "EntityTooSmall",
Message: "Your proposed upload is smaller than the minimum allowed object size.",
}
s3ErrInternal = s3APIError{
Status: http.StatusInternalServerError,
Code: "InternalError",
@@ -64,6 +85,26 @@ func mapToS3Error(err error) s3APIError {
Code: "NoSuchKey",
Message: "The specified key does not exist.",
}
case errors.Is(err, metadata.ErrMultipartNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchUpload",
Message: "The specified multipart upload does not exist.",
}
case errors.Is(err, metadata.ErrMultipartNotPending):
return s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidRequest",
Message: "The multipart upload is not in a valid state for this operation.",
}
case errors.Is(err, service.ErrInvalidPart):
return s3ErrInvalidPart
case errors.Is(err, service.ErrInvalidPartOrder):
return s3ErrInvalidPartOrder
case errors.Is(err, service.ErrInvalidCompleteRequest):
return s3ErrMalformedXML
case errors.Is(err, service.ErrEntityTooSmall):
return s3ErrEntityTooSmall
default:
return s3ErrInternal
}

11
go.mod
View File

@@ -3,9 +3,12 @@ module fs
go 1.25.7
require (
github.com/go-chi/chi/v5 v5.2.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/klauspost/reedsolomon v1.13.2 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
github.com/go-chi/chi/v5 v5.2.5
github.com/google/uuid v1.6.0
go.etcd.io/bbolt v1.4.3
)
require (
github.com/joho/godotenv v1.5.1 // indirect
golang.org/x/sys v0.41.0 // indirect
)

20
go.sum
View File

@@ -1,12 +1,20 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
github.com/klauspost/reedsolomon v1.13.2/go.mod h1:ggJT9lc71Vu+cSOPBlxGvBN6TfAS77qB4fp8vJ05NSA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

150
logging/logging.go Normal file
View File

@@ -0,0 +1,150 @@
package logging
import (
"log/slog"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5/middleware"
)
type Config struct {
Level slog.Level
LevelName string
Format string
Audit bool
AddSource bool
DebugMode bool
}
func ConfigFromEnv() Config {
levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL")))
format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT")))
return ConfigFromValues(levelName, format, envBool("AUDIT_LOG", true))
}
func ConfigFromValues(levelName, format string, audit bool) Config {
levelName = strings.ToLower(strings.TrimSpace(levelName))
if levelName == "" {
levelName = "info"
}
level := parseLevel(levelName)
levelName = strings.ToUpper(level.String())
format = strings.ToLower(strings.TrimSpace(format))
if format == "" {
format = "text"
}
if format != "json" && format != "text" {
format = "text"
}
debugMode := level <= slog.LevelDebug
return Config{
Level: level,
LevelName: levelName,
Format: format,
Audit: audit,
AddSource: debugMode,
DebugMode: debugMode,
}
}
func NewLogger(cfg Config) *slog.Logger {
opts := &slog.HandlerOptions{
Level: cfg.Level,
AddSource: cfg.AddSource,
}
opts.ReplaceAttr = func(_ []string, attr slog.Attr) slog.Attr {
if attr.Key == slog.SourceKey {
if src, ok := attr.Value.Any().(*slog.Source); ok && src != nil {
attr.Key = "src"
attr.Value = slog.StringValue(filepath.Base(src.File) + ":" + strconv.Itoa(src.Line))
}
}
return attr
}
var handler slog.Handler
if cfg.Format == "json" {
handler = slog.NewJSONHandler(os.Stdout, opts)
} else {
handler = slog.NewTextHandler(os.Stdout, opts)
}
logger := slog.New(handler)
slog.SetDefault(logger)
return logger
}
func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
next.ServeHTTP(ww, r)
if !cfg.Audit && !cfg.DebugMode {
return
}
elapsed := time.Since(start)
status := ww.Status()
if status == 0 {
status = http.StatusOK
}
attrs := []any{
"method", r.Method,
"path", r.URL.Path,
"status", status,
"bytes", ww.BytesWritten(),
"duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0,
"remote_addr", r.RemoteAddr,
}
if cfg.DebugMode {
attrs = append(attrs,
"query", r.URL.RawQuery,
"user_agent", r.UserAgent(),
"content_length", r.ContentLength,
"content_type", r.Header.Get("Content-Type"),
"x_amz_sha256", r.Header.Get("x-amz-content-sha256"),
)
logger.Debug("http_request", attrs...)
return
}
logger.Info("http_request", attrs...)
})
}
}
func envBool(key string, defaultValue bool) bool {
raw := os.Getenv(key)
if raw == "" {
return defaultValue
}
value, err := strconv.ParseBool(raw)
if err != nil {
return defaultValue
}
return value
}
func parseLevel(levelName string) slog.Level {
switch levelName {
case "debug":
return slog.LevelDebug
case "warn", "warning":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}

41
main.go
View File

@@ -1,24 +1,51 @@
package main
import (
"fmt"
"fs/api"
"fs/logging"
"fs/metadata"
"fs/service"
"fs/storage"
"fs/utils"
"os"
"path/filepath"
"strconv"
)
func main() {
config := utils.NewConfig()
logConfig := logging.ConfigFromValues(config.LogLevel, config.LogFormat, config.AuditLog)
logger := logging.NewLogger(logConfig)
logger.Info("boot",
"log_level", logConfig.LevelName,
"log_format", logConfig.Format,
"audit_log", logConfig.Audit,
"data_path", config.DataPath,
)
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
if err != nil {
fmt.Printf("Error initializing metadata handler: %v\n", err)
if err := os.MkdirAll(config.DataPath, 0o755); err != nil {
logger.Error("failed_to_prepare_data_path", "path", config.DataPath, "error", err)
return
}
objectService := service.NewObjectService(metadataHandler)
handler := api.NewHandler(objectService)
err = handler.Start("localhost:3000")
dbPath := filepath.Join(config.DataPath, "metadata.db")
metadataHandler, err := metadata.NewMetadataHandler(dbPath)
if err != nil {
logger.Error("failed_to_initialize_metadata_handler", "error", err)
return
}
blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize)
if err != nil {
_ = metadataHandler.Close()
logger.Error("failed_to_initialize_blob_store", "error", err)
return
}
objectService := service.NewObjectService(metadataHandler, blobHandler)
handler := api.NewHandler(objectService, logger, logConfig)
addr := config.Address + ":" + strconv.Itoa(config.Port)
if err = handler.Start(addr); err != nil {
logger.Error("server_stopped_with_error", "error", err)
return
}
}

View File

@@ -6,9 +6,12 @@ import (
"fmt"
"fs/models"
"regexp"
"sort"
"strings"
"time"
"github.com/google/uuid"
"go.etcd.io/bbolt"
)
@@ -17,6 +20,8 @@ type MetadataHandler struct {
}
var systemIndex = []byte("__SYSTEM_BUCKETS__")
var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__")
var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__")
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`)
@@ -26,10 +31,12 @@ var (
ErrBucketNotFound = errors.New("bucket not found")
ErrBucketNotEmpty = errors.New("bucket not empty")
ErrObjectNotFound = errors.New("object not found")
ErrMultipartNotFound = errors.New("multipart upload not found")
ErrMultipartNotPending = errors.New("multipart upload is not pending")
)
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
db, err := bbolt.Open(dbPath, 0600, nil)
db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 2 * time.Second})
if err != nil {
return nil, err
}
@@ -43,10 +50,30 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
return h, nil
}
func (h *MetadataHandler) Close() error {
return h.db.Close()
}
func (h *MetadataHandler) CreateBucket(bucketName string) error {
if !validBucketName.MatchString(bucketName) {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
@@ -265,3 +292,296 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
return nil
}
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
deleted := make([]string, 0, len(keys))
err := h.db.Update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
for _, key := range keys {
if key == "" {
continue
}
if metadataBucket.Get([]byte(key)) != nil {
if err := metadataBucket.Delete([]byte(key)); err != nil {
return err
}
}
deleted = append(deleted, key)
}
return nil
})
if err != nil {
return nil, err
}
return deleted, nil
}
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
if systemIndexBucket.Get([]byte(bucket)) != nil {
return nil
}
return ErrBucketNotFound
})
if err != nil {
return nil, err
}
uploadId := uuid.New().String()
createdAt := time.Now().UTC().Format(time.RFC3339)
upload = &models.MultipartUpload{
Bucket: bucket,
Key: key,
UploadID: uploadId,
CreatedAt: createdAt,
State: "pending",
}
err = h.db.Update(func(tx *bbolt.Tx) error {
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
if multipartUploadBucket == nil {
return errors.New("multipart upload index not found")
}
payload, err := json.Marshal(upload)
if err != nil {
return err
}
err = multipartUploadBucket.Put([]byte(uploadId), payload)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return upload, nil
}
func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
multipartUploadBucket := tx.Bucket(multipartUploadIndex)
if multipartUploadBucket == nil {
return nil, errors.New("multipart upload index not found")
}
return multipartUploadBucket, nil
}
func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex)
if multipartPartsBucket == nil {
return nil, errors.New("multipart upload parts index not found")
}
return multipartPartsBucket, nil
}
func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) {
payload := multipartUploadBucket.Get([]byte(uploadID))
if payload == nil {
return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID)
}
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return nil, err
}
return &upload, nil
}
func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) {
multipartUploadBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return nil, nil, err
}
upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID)
if err != nil {
return nil, nil, err
}
return upload, multipartUploadBucket, nil
}
func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error {
payload, err := json.Marshal(upload)
if err != nil {
return err
}
return multipartUploadBucket.Put([]byte(uploadID), payload)
}
func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
prefix := uploadID + ":"
cursor := multipartPartsBucket.Cursor()
keysToDelete := make([][]byte, 0)
for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() {
keyCopy := make([]byte, len(k))
copy(keyCopy, k)
keysToDelete = append(keysToDelete, keyCopy)
}
for _, key := range keysToDelete {
if err := multipartPartsBucket.Delete(key); err != nil {
return err
}
}
return nil
}
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.db.View(func(tx *bbolt.Tx) error {
var err error
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return upload, nil
}
func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error {
if part.PartNumber < 1 || part.PartNumber > 10000 {
return fmt.Errorf("invalid part number: %d", part.PartNumber)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State != "pending" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber)
payload, err := json.Marshal(part)
if err != nil {
return err
}
return multipartPartsBucket.Put([]byte(key), payload)
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
parts := make([]models.UploadedPart, 0)
err := h.db.View(func(tx *bbolt.Tx) error {
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
return err
}
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
prefix := uploadID + ":"
cursor := multipartPartsBucket.Cursor()
for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() {
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
parts = append(parts, part)
}
return nil
})
if err != nil {
return nil, err
}
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
return parts, nil
}
func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error {
if final == nil {
return errors.New("final object manifest is required")
}
err := h.db.Update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State != "pending" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
metadataBucket := tx.Bucket([]byte(upload.Bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket)
}
final.Bucket = upload.Bucket
final.Key = upload.Key
finalPayload, err := json.Marshal(final)
if err != nil {
return err
}
if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil {
return err
}
upload.State = "completed"
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State == "completed" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
upload.State = "aborted"
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}

View File

@@ -58,3 +58,81 @@ type Contents struct {
type CommonPrefixes struct {
Prefix string `xml:"Prefix"`
}
type MultipartUpload struct {
UploadID string `json:"upload_id" xml:"UploadId"`
Bucket string `json:"bucket" xml:"Bucket"`
Key string `json:"key" xml:"Key"`
CreatedAt string `json:"created_at" xml:"CreatedAt"`
State string `json:"state" xml:"State"`
}
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
}
type UploadedPart struct {
PartNumber int `json:"part_number" xml:"PartNumber"`
ETag string `json:"etag" xml:"ETag"`
Size int64 `json:"size" xml:"Size"`
Chunks []string `json:"chunks"`
CreatedAt int64 `json:"created_at"`
}
type CompletedPart struct {
PartNumber int `xml:"PartNumber"`
ETag string `xml:"ETag"`
}
type CompleteMultipartUploadRequest struct {
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []CompletedPart `xml:"Part"`
}
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
ETag string `xml:"ETag"`
Location string `xml:"Location,omitempty"`
}
type ListPartsResult struct {
XMLName xml.Name `xml:"ListPartsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
Parts []PartItem `xml:"Part"`
}
type PartItem struct {
PartNumber int `xml:"PartNumber"`
LastModified string `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
}
type DeleteObjectsRequest struct {
XMLName xml.Name `xml:"Delete"`
Objects []DeleteObjectIdentity `xml:"Object"`
Quiet bool `xml:"Quiet"`
}
type DeleteObjectIdentity struct {
Key string `xml:"Key"`
}
type DeleteObjectsResult struct {
XMLName xml.Name `xml:"DeleteResult"`
Xmlns string `xml:"xmlns,attr"`
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
}
type DeletedEntry struct {
Key string `xml:"Key"`
}

View File

@@ -1,25 +1,38 @@
package service
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"fs/metadata"
"fs/models"
"fs/storage"
"io"
"log/slog"
"strings"
"time"
)
type ObjectService struct {
metadataHandler *metadata.MetadataHandler
metadata *metadata.MetadataHandler
blob *storage.BlobStore
}
func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService {
return &ObjectService{metadataHandler: metadataHandler}
var (
ErrInvalidPart = errors.New("invalid multipart part")
ErrInvalidPartOrder = errors.New("invalid multipart part order")
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
ErrEntityTooSmall = errors.New("multipart entity too small")
)
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore) *ObjectService {
return &ObjectService{metadata: metadataHandler, blob: blobHandler}
}
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
chunks, size, etag, err := storage.IngestStream(input)
chunks, size, etag, err := s.blob.IngestStream(input)
if err != nil {
return nil, err
}
@@ -34,8 +47,14 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
Chunks: chunks,
CreatedAt: timestamp,
}
fmt.Println(manifest)
if err = s.metadataHandler.PutManifest(manifest); err != nil {
slog.Debug("object_written_manifest",
"bucket", manifest.Bucket,
"key", manifest.Key,
"size", manifest.Size,
"chunk_count", len(manifest.Chunks),
"etag", manifest.ETag,
)
if err = s.metadata.PutManifest(manifest); err != nil {
return nil, err
}
@@ -43,7 +62,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
}
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
manifest, err := s.metadataHandler.GetManifest(bucket, key)
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
return nil, nil, err
}
@@ -57,7 +76,7 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
}
}(pw)
err := storage.AssembleStream(manifest.Chunks, pw)
err := s.blob.AssembleStream(manifest.Chunks, pw)
if err != nil {
return
}
@@ -66,7 +85,7 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
}
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
manifest, err := s.metadataHandler.GetManifest(bucket, key)
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
return models.ObjectManifest{}, err
}
@@ -74,26 +93,176 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
}
func (s *ObjectService) DeleteObject(bucket, key string) error {
return s.metadataHandler.DeleteManifest(bucket, key)
return s.metadata.DeleteManifest(bucket, key)
}
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
return s.metadataHandler.ListObjects(bucket, prefix)
return s.metadata.ListObjects(bucket, prefix)
}
func (s *ObjectService) CreateBucket(bucket string) error {
return s.metadataHandler.CreateBucket(bucket)
return s.metadata.CreateBucket(bucket)
}
func (s *ObjectService) HeadBucket(bucket string) error {
_, err := s.metadataHandler.GetBucketManifest(bucket)
_, err := s.metadata.GetBucketManifest(bucket)
return err
}
func (s *ObjectService) DeleteBucket(bucket string) error {
return s.metadataHandler.DeleteBucket(bucket)
return s.metadata.DeleteBucket(bucket)
}
func (s *ObjectService) ListBuckets() ([]string, error) {
return s.metadataHandler.ListBuckets()
return s.metadata.ListBuckets()
}
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
return s.metadata.DeleteManifests(bucket, keys)
}
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
return s.metadata.CreateMultipartUpload(bucket, key)
}
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
if partNumber < 1 || partNumber > 10000 {
return "", ErrInvalidPart
}
upload, err := s.metadata.GetMultipartUpload(uploadId)
if err != nil {
return "", err
}
if upload.Bucket != bucket || upload.Key != key {
return "", metadata.ErrMultipartNotFound
}
var uploadedPart models.UploadedPart
chunkIds, totalSize, etag, err := s.blob.IngestStream(input)
if err != nil {
return "", err
}
uploadedPart = models.UploadedPart{
PartNumber: partNumber,
ETag: etag,
Size: totalSize,
Chunks: chunkIds,
CreatedAt: time.Now().Unix(),
}
err = s.metadata.PutMultipartPart(uploadId, uploadedPart)
if err != nil {
return "", err
}
return etag, nil
}
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
}
if upload.Bucket != bucket || upload.Key != key {
return nil, metadata.ErrMultipartNotFound
}
return s.metadata.ListMultipartParts(uploadID)
}
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
if len(completed) == 0 {
return nil, ErrInvalidCompleteRequest
}
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
}
if upload.Bucket != bucket || upload.Key != key {
return nil, metadata.ErrMultipartNotFound
}
storedParts, err := s.metadata.ListMultipartParts(uploadID)
if err != nil {
return nil, err
}
partsByNumber := make(map[int]models.UploadedPart, len(storedParts))
for _, part := range storedParts {
partsByNumber[part.PartNumber] = part
}
lastPartNumber := 0
orderedParts := make([]models.UploadedPart, 0, len(completed))
chunks := make([]string, 0)
var totalSize int64
for i, part := range completed {
if part.PartNumber <= lastPartNumber {
return nil, ErrInvalidPartOrder
}
lastPartNumber = part.PartNumber
storedPart, ok := partsByNumber[part.PartNumber]
if !ok {
return nil, ErrInvalidPart
}
if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) {
return nil, ErrInvalidPart
}
if i < len(completed)-1 && storedPart.Size < 5*1024*1024 {
return nil, ErrEntityTooSmall
}
orderedParts = append(orderedParts, storedPart)
chunks = append(chunks, storedPart.Chunks...)
totalSize += storedPart.Size
}
finalETag := buildMultipartETag(orderedParts)
manifest := &models.ObjectManifest{
Bucket: bucket,
Key: key,
Size: totalSize,
ContentType: "application/octet-stream",
ETag: finalETag,
Chunks: chunks,
CreatedAt: time.Now().Unix(),
}
if err := s.metadata.CompleteMultipartUpload(uploadID, manifest); err != nil {
return nil, err
}
return manifest, nil
}
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return err
}
if upload.Bucket != bucket || upload.Key != key {
return metadata.ErrMultipartNotFound
}
return s.metadata.AbortMultipartUpload(uploadID)
}
func normalizeETag(etag string) string {
return strings.Trim(etag, "\"")
}
func buildMultipartETag(parts []models.UploadedPart) string {
hasher := md5.New()
for _, part := range parts {
etagBytes, err := hex.DecodeString(normalizeETag(part.ETag))
if err == nil {
_, _ = hasher.Write(etagBytes)
continue
}
_, _ = hasher.Write([]byte(normalizeETag(part.ETag)))
}
return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts))
}
func (s *ObjectService) Close() error {
return s.metadata.Close()
}

View File

@@ -5,18 +5,41 @@ import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
)
const chunkSize = 64 * 1024
const blobRoot = "blobs/"
const blobRoot = "blobs"
const maxChunkSize = 64 * 1024 * 1024
func IngestStream(stream io.Reader) ([]string, int64, string, error) {
type BlobStore struct {
dataRoot string
chunkSize int
}
func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
root = strings.TrimSpace(root)
if root == "" {
return nil, errors.New("blob root is required")
}
if chunkSize <= 0 || chunkSize > maxChunkSize {
return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize)
}
cleanRoot := filepath.Clean(root)
if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil {
return nil, err
}
return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil
}
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
fullFileHasher := md5.New()
buffer := make([]byte, chunkSize)
buffer := make([]byte, bs.chunkSize)
var totalSize int64
var chunkIDs []string
@@ -35,7 +58,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
chunkHash := sha256.Sum256(chunkData)
chunkID := hex.EncodeToString(chunkHash[:])
err := saveBlob(chunkID, chunkData)
err := bs.saveBlob(chunkID, chunkData)
if err != nil {
return nil, 0, "", err
}
@@ -54,8 +77,11 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
return chunkIDs, totalSize, etag, nil
}
func saveBlob(chunkID string, data []byte) error {
dir := filepath.Join(blobRoot, chunkID[:2], chunkID[2:4])
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4])
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
@@ -69,9 +95,9 @@ func saveBlob(chunkID string, data []byte) error {
return nil
}
func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
for _, chunkID := range chunkIDs {
chunkData, err := GetBlob(chunkID)
chunkData, err := bs.GetBlob(chunkID)
if err != nil {
return err
}
@@ -82,7 +108,21 @@ func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
return nil
}
func GetBlob(chunkID string) ([]byte, error) {
return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID))
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
if !isValidChunkID(chunkID) {
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
}
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
}
func isValidChunkID(chunkID string) bool {
if len(chunkID) != sha256.Size*2 {
return false
}
for _, ch := range chunkID {
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') {
return false
}
}
return true
}

89
utils/config.go Normal file
View File

@@ -0,0 +1,89 @@
package utils
import (
"os"
"path/filepath"
"strconv"
"strings"
"github.com/joho/godotenv"
)
type Config struct {
DataPath string
Address string
Port int
ChunkSize int
LogLevel string
LogFormat string
AuditLog bool
}
func NewConfig() *Config {
_ = godotenv.Load()
config := &Config{
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
Port: envIntRange("PORT", 3000, 1, 65535),
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true),
}
if config.LogFormat != "json" && config.LogFormat != "text" {
config.LogFormat = "text"
}
return config
}
func envIntRange(key string, defaultValue, minValue, maxValue int) int {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.Atoi(raw)
if err != nil {
return defaultValue
}
if value < minValue || value > maxValue {
return defaultValue
}
return value
}
func envBool(key string, defaultValue bool) bool {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.ParseBool(raw)
if err != nil {
return defaultValue
}
return value
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if v != "" {
return v
}
}
return ""
}
func sanitizeDataPath(raw string) string {
cleaned := strings.TrimSpace(raw)
if cleaned == "" {
cleaned = "."
}
cleaned = filepath.Clean(cleaned)
if abs, err := filepath.Abs(cleaned); err == nil {
return abs
}
return cleaned
}