24 Commits

Author SHA1 Message Date
edfb5f5b2a Refine object operations and multi-object delete sections
Updated the formatting and structure of the object operations and multi-object delete sections in the README.
2026-02-24 10:23:53 +01:00
c997fe8471 Improve formatting of features in README
Updated feature list formatting in README.md for better readability.
2026-02-24 10:23:24 +01:00
fca553028c Merge pull request #3 from ferdzo/LICENSE
License
2026-02-24 10:20:05 +01:00
3630aad584 Merge branch 'develop' into LICENSE 2026-02-24 10:19:41 +01:00
93296ff74e Merge pull request #2 from ferdzo/feature/garbage-collection
Garbage collection and few other things
2026-02-24 10:18:11 +01:00
1b7393a545 Fix license section heading in README.md 2026-02-24 10:17:05 +01:00
Andrej Mickov
a3fad34272 LICENSE.md 2026-02-24 10:14:00 +01:00
06c90be50f Fixed copilot suggestions. 2026-02-24 00:28:33 +01:00
5e87247087 Introduced garbage collection, safe data write to storage and improved S3 compatibility. 2026-02-24 00:05:53 +01:00
a4990dae01 Merge pull request #1 from ferdzo/feature/multipart-upload
Multipart Upload support
2026-02-23 22:37:33 +01:00
d9a1bd9001 Applied Copilot review suggestions 2026-02-23 22:35:42 +01:00
a8204de914 Fixed logging, added config and .env example 2026-02-23 21:52:45 +01:00
d7bdb3177b Added logging 2026-02-23 00:42:38 +01:00
c989037160 Finialized multipart upload and graceful shutdown. Added Dockerfile. 2026-02-22 23:00:33 +01:00
5d41ec9e0a Implemented bulk delete from bucket, AWS SigV4 framing problems solved. 2026-02-22 15:32:04 +01:00
111ce5b669 Working MultipartUpload that needs minor tweaks. 2026-02-22 14:46:04 +01:00
5438a7f4b4 Updated gitignore 2026-02-22 13:43:23 +01:00
9b5035dfa0 Initial Multipart Upload 2026-02-22 13:42:23 +01:00
65a7a7eef8 S3 Error handling 2026-02-22 13:02:22 +01:00
eb798be550 Updated error handling to be S3 XML compatible. Implemented DeleteObject. 2026-02-22 13:01:59 +01:00
b19c24d9b7 Object prefix list filtering 2026-02-21 21:35:15 +01:00
6fe5a8a629 Added Bucket routes and bucket logic 2026-02-21 21:27:34 +01:00
151c11a636 HEAD Object route 2026-02-21 12:13:33 +01:00
f151f8055a Working simple PUT/GET API 2026-02-21 11:55:14 +01:00
18 changed files with 2720 additions and 90 deletions

4
.dockerignore Normal file
View File

@@ -0,0 +1,4 @@
*.md
.gocache/
blobs/
data/

8
.env.example Normal file
View File

@@ -0,0 +1,8 @@
LOG_LEVEL=debug
LOG_FORMAT=text
DATA_PATH=data/
PORT=2600
AUDIT_LOG=true
ADDRESS=0.0.0.0
GC_INTERVAL=10
GC_ENABLED=true

5
.gitignore vendored
View File

@@ -1,5 +1,8 @@
.env
*.db
.vscode/
blobs/
*.db
data/
.idea/
.gocache/
.gomodcache/

16
Dockerfile Normal file
View File

@@ -0,0 +1,16 @@
FROM golang:1.25-alpine AS build
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /app/fs .
FROM scratch AS runner
COPY --from=build /app/fs /app/fs
WORKDIR /app
CMD ["/app/fs"]

8
LICENSE.md Normal file
View File

@@ -0,0 +1,8 @@
Copyright 2025 ferdzo
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -1,3 +1,41 @@
# fs
An experimental Object Storage written in Go that should be compatible with S3
An experimental Object Storage written in Go that should be partially compatible with S3
## Features
Bucket operations:
- `PUT /{bucket}`
- `HEAD /{bucket}`
- `DELETE /{bucket}`
- `GET /` (list buckets)
-
Object operations:
- `PUT /{bucket}/{key}`
- `GET /{bucket}/{key}`
- `HEAD /{bucket}/{key}`
- `DELETE /{bucket}/{key}`
- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style)
Multipart upload:
- `POST /{bucket}/{key}?uploads` (initiate)
- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part)
- `GET /{bucket}/{key}?uploadId=...` (list parts)
- `POST /{bucket}/{key}?uploadId=...` (complete)
- `DELETE /{bucket}/{key}?uploadId=...` (abort)
Multi-object delete:
- `POST /{bucket}?delete` with S3-style XML body
AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies)
## Limitations
- No authentication/authorization yet.
- Not full S3 API coverage.
- No versioning or lifecycle policies.
- Error and edge-case behavior is still being refined for client compatibility.
## License
MIT License

View File

@@ -1 +1,856 @@
package api
import (
"bufio"
"context"
"encoding/base64"
"encoding/xml"
"errors"
"fmt"
"fs/logging"
"fs/metadata"
"fs/models"
"fs/service"
"io"
"log/slog"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
type Handler struct {
router *chi.Mux
svc *service.ObjectService
logger *slog.Logger
logConfig logging.Config
}
func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig logging.Config) *Handler {
r := chi.NewRouter()
r.Use(middleware.Recoverer)
if logger == nil {
logger = slog.Default()
}
h := &Handler{
router: r,
svc: svc,
logger: logger,
logConfig: logConfig,
}
return h
}
func (h *Handler) setupRoutes() {
h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig))
h.router.Get("/", h.handleGetBuckets)
h.router.Get("/{bucket}/", h.handleGetBucket)
h.router.Get("/{bucket}", h.handleGetBucket)
h.router.Put("/{bucket}", h.handlePutBucket)
h.router.Put("/{bucket}/", h.handlePutBucket)
h.router.Post("/{bucket}", h.handlePostBucket)
h.router.Post("/{bucket}/", h.handlePostBucket)
h.router.Delete("/{bucket}", h.handleDeleteBucket)
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
h.router.Head("/{bucket}", h.handleHeadBucket)
h.router.Head("/{bucket}/", h.handleHeadBucket)
h.router.Get("/{bucket}/*", h.handleGetObject)
h.router.Put("/{bucket}/*", h.handlePutObject)
h.router.Post("/{bucket}/*", h.handlePostObject)
h.router.Head("/{bucket}/*", h.handleHeadObject)
h.router.Delete("/{bucket}/*", h.handleDeleteObject)
}
func (h *Handler) handleWelcome(w http.ResponseWriter) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("Welcome to the Object Storage API!"))
if err != nil {
return
}
}
func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if key == "" {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
h.handleListMultipartParts(w, r, bucket, key, uploadID)
return
}
stream, manifest, err := h.svc.GetObject(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
defer stream.Close()
rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
if rangeHeader != "" {
start, end, err := parseSingleByteRange(rangeHeader, manifest.Size)
if err != nil {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size))
writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path)
return
}
if start > 0 {
if _, err := io.CopyN(io.Discard, stream, start); err != nil {
writeMappedS3Error(w, r, err)
return
}
}
length := end - start + 1
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
_, _ = io.CopyN(w, stream, length)
return
}
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, stream)
}
func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if key == "" {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
defer r.Body.Close()
if _, ok := r.URL.Query()["uploads"]; ok {
upload, err := h.svc.CreateMultipartUpload(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.InitiateMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: upload.Bucket,
Key: upload.Key,
UploadID: upload.UploadID,
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
return
}
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
var req models.CompleteMultipartUploadRequest
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.CompleteMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucket,
Key: key,
ETag: `"` + manifest.ETag + `"`,
Location: r.URL.Path,
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if key == "" {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
defer r.Body.Close()
uploadID := r.URL.Query().Get("uploadId")
partNumberRaw := r.URL.Query().Get("partNumber")
if uploadID != "" || partNumberRaw != "" {
if uploadID == "" || partNumberRaw == "" {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
partNumber, err := strconv.Atoi(partNumberRaw)
if err != nil {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
if partNumber < 1 || partNumber > 10000 {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusOK)
return
}
contentType := r.Header.Get("Content-Type")
if contentType == "" {
contentType = "application/octet-stream"
}
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusOK)
}
func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) {
parts, err := h.svc.ListMultipartParts(bucket, key, uploadID)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.ListPartsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucket,
Key: key,
UploadID: uploadID,
Parts: make([]models.PartItem, 0, len(parts)),
}
for _, part := range parts {
response.Parts = append(response.Parts, models.PartItem{
PartNumber: part.PartNumber,
LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + part.ETag + `"`,
Size: part.Size,
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
if strings.Contains(contentEncoding, "aws-chunked") {
return true
}
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload")
}
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
if err := decodeAWSChunkedPayload(src, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
}()
return pr
}
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
reader := bufio.NewReader(src)
for {
headerLine, err := reader.ReadString('\n')
if err != nil {
return err
}
headerLine = strings.TrimRight(headerLine, "\r\n")
chunkSizeToken := headerLine
if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 {
chunkSizeToken = chunkSizeToken[:idx]
}
chunkSizeToken = strings.TrimSpace(chunkSizeToken)
chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64)
if err != nil {
return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err)
}
if chunkSize < 0 {
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
}
if chunkSize > 0 {
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
return err
}
}
crlf := make([]byte, 2)
if _, err := io.ReadFull(reader, crlf); err != nil {
return err
}
if crlf[0] != '\r' || crlf[1] != '\n' {
return errors.New("invalid aws-chunked payload terminator")
}
if chunkSize == 0 {
for {
line, err := reader.ReadString('\n')
if err != nil {
return err
}
if line == "\r\n" || line == "\n" {
return nil
}
}
}
}
}
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if key == "" {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
manifest, err := h.svc.HeadObject(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
etag := manifest.ETag
size := strconv.FormatInt(manifest.Size, 10)
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", size)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.WriteHeader(http.StatusOK)
}
func (h *Handler) handlePutBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if err := h.svc.CreateBucket(bucket); err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusCreated)
}
func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if err := h.svc.DeleteBucket(bucket); err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if _, ok := r.URL.Query()["delete"]; !ok {
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
return
}
defer r.Body.Close()
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
var req models.DeleteObjectsRequest
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
keys := make([]string, 0, len(req.Objects))
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
for _, obj := range req.Objects {
if obj.Key == "" {
response.Errors = append(response.Errors, models.DeleteError{
Key: obj.Key,
Code: s3ErrInvalidObjectKey.Code,
Message: s3ErrInvalidObjectKey.Message,
})
continue
}
keys = append(keys, obj.Key)
}
deleted, err := h.svc.DeleteObjects(bucket, keys)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
if !req.Quiet {
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
for _, key := range deleted {
response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key})
}
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if key == "" {
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
return
}
if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" {
err := h.svc.AbortMultipartUpload(bucket, key, uploadId)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
return
}
err := h.svc.DeleteObject(bucket, key)
if err != nil {
if errors.Is(err, metadata.ErrObjectNotFound) {
w.WriteHeader(http.StatusNoContent)
return
}
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if err := h.svc.HeadBucket(bucket); err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusOK)
}
func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
buckets, err := h.svc.ListBuckets()
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.ListAllMyBucketsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Owner: models.BucketsOwner{
ID: "local",
DisplayName: "local",
},
Buckets: models.BucketsElement{
Items: make([]models.BucketItem, 0, len(buckets)),
},
}
for _, bucket := range buckets {
manifest, err := h.svc.GetBucketManifest(bucket)
if err != nil {
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
continue
}
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
Name: bucket,
CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"),
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if r.URL.Query().Get("list-type") == "2" {
h.handleListObjectsV2(w, r, bucket)
return
}
if r.URL.Query().Has("location") {
xmlResponse := `<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">us-east-1</LocationConstraint>`
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(xmlResponse))
if err != nil {
return
}
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
prefix := r.URL.Query().Get("prefix")
delimiter := r.URL.Query().Get("delimiter")
startAfter := r.URL.Query().Get("start-after")
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
if encodingType != "" && encodingType != "url" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
maxKeys := 1000
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
parsed, err := strconv.Atoi(rawMaxKeys)
if err != nil || parsed < 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if parsed > 1000 {
parsed = 1000
}
maxKeys = parsed
}
continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token"))
continuationMarker := ""
if continuationToken != "" {
decoded, err := base64.StdEncoding.DecodeString(continuationToken)
if err != nil || len(decoded) == 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
continuationMarker = string(decoded)
}
objects, err := h.svc.ListObjects(bucket, prefix)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
entries := buildListV2Entries(objects, prefix, delimiter)
startIdx := 0
if continuationMarker != "" {
found := false
for i, entry := range entries {
if entry.Marker == continuationMarker {
startIdx = i + 1
found = true
break
}
}
if !found {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
} else if startAfter != "" {
for startIdx < len(entries) && entries[startIdx].SortKey <= startAfter {
startIdx++
}
}
result := models.ListBucketResultV2{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: s3EncodeIfNeeded(prefix, encodingType),
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
MaxKeys: maxKeys,
ContinuationToken: continuationToken,
StartAfter: s3EncodeIfNeeded(startAfter, encodingType),
EncodingType: encodingType,
}
endIdx := startIdx
for endIdx < len(entries) && result.KeyCount < maxKeys {
entry := entries[endIdx]
if entry.Object != nil {
result.Contents = append(result.Contents, models.Contents{
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + entry.Object.ETag + `"`,
Size: entry.Object.Size,
StorageClass: "STANDARD",
})
} else {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
})
}
result.KeyCount++
endIdx++
}
result.IsTruncated = endIdx < len(entries)
if result.IsTruncated && result.KeyCount > 0 {
result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[endIdx-1].Marker))
}
xmlResponse, err := xml.MarshalIndent(result, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(xmlResponse)
}
type listV2Entry struct {
Marker string
SortKey string
Object *models.ObjectManifest
CommonPrefix string
}
func buildListV2Entries(objects []*models.ObjectManifest, prefix, delimiter string) []listV2Entry {
sorted := make([]*models.ObjectManifest, 0, len(objects))
sorted = append(sorted, objects...)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Key < sorted[j].Key
})
entries := make([]listV2Entry, 0, len(sorted))
seenCommonPrefixes := make(map[string]struct{})
for _, object := range sorted {
if object == nil {
continue
}
if delimiter != "" {
relative := strings.TrimPrefix(object.Key, prefix)
if idx := strings.Index(relative, delimiter); idx >= 0 {
commonPrefix := prefix + relative[:idx+len(delimiter)]
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
continue
}
seenCommonPrefixes[commonPrefix] = struct{}{}
entries = append(entries, listV2Entry{
Marker: "C:" + commonPrefix,
SortKey: commonPrefix,
CommonPrefix: commonPrefix,
})
continue
}
}
entries = append(entries, listV2Entry{
Marker: "K:" + object.Key,
SortKey: object.Key,
Object: object,
})
}
return entries
}
func s3EncodeIfNeeded(value, encodingType string) string {
if encodingType != "url" || value == "" {
return value
}
encoded := url.QueryEscape(value)
return strings.ReplaceAll(encoded, "+", "%20")
}
func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) {
if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") {
return 0, 0, errors.New("invalid range")
}
spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes="))
if spec == "" || strings.Contains(spec, ",") {
return 0, 0, errors.New("invalid range")
}
parts := strings.SplitN(spec, "-", 2)
if len(parts) != 2 {
return 0, 0, errors.New("invalid range")
}
if parts[0] == "" {
suffixLength, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil || suffixLength <= 0 {
return 0, 0, errors.New("invalid range")
}
if suffixLength > size {
suffixLength = size
}
start := size - suffixLength
end := size - 1
return start, end, nil
}
start, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil || start < 0 || start >= size {
return 0, 0, errors.New("invalid range")
}
var end int64
if parts[1] == "" {
end = size - 1
} else {
end, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil || end < start {
return 0, 0, errors.New("invalid range")
}
if end >= size {
end = size - 1
}
}
return start, end, nil
}
func (h *Handler) Start(ctx context.Context, address string) error {
if ctx == nil {
ctx = context.Background()
}
h.logger.Info("server_starting",
"address", address,
"log_format", h.logConfig.Format,
"log_level", h.logConfig.LevelName,
"audit_log", h.logConfig.Audit,
)
h.setupRoutes()
server := http.Server{
Addr: address,
Handler: h.router,
}
errCh := make(chan error, 1)
go func() {
if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}
}()
select {
case <-ctx.Done():
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
case err := <-errCh:
h.logger.Error("server_listen_failed", "error", err)
if closeErr := h.svc.Close(); closeErr != nil {
h.logger.Error("service_close_failed", "error", closeErr)
}
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
h.logger.Error("server_shutdown_failed", "error", err)
return err
}
if err := h.svc.Close(); err != nil {
h.logger.Error("service_close_failed", "error", err)
return err
}
h.logger.Info("server_stopped")
return nil
}

148
api/s3_errors.go Normal file
View File

@@ -0,0 +1,148 @@
package api
import (
"encoding/xml"
"errors"
"fs/metadata"
"fs/models"
"fs/service"
"net/http"
)
type s3APIError struct {
Status int
Code string
Message string
}
var (
s3ErrInvalidObjectKey = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidArgument",
Message: "Object key is required.",
}
s3ErrNotImplemented = s3APIError{
Status: http.StatusNotImplemented,
Code: "NotImplemented",
Message: "A header you provided implies functionality that is not implemented.",
}
s3ErrInvalidPart = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidPart",
Message: "One or more of the specified parts could not be found.",
}
s3ErrInvalidPartOrder = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidPartOrder",
Message: "The list of parts was not in ascending order.",
}
s3ErrMalformedXML = s3APIError{
Status: http.StatusBadRequest,
Code: "MalformedXML",
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
}
s3ErrInvalidArgument = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidArgument",
Message: "Invalid argument.",
}
s3ErrInvalidRange = s3APIError{
Status: http.StatusRequestedRangeNotSatisfiable,
Code: "InvalidRange",
Message: "The requested range is not satisfiable.",
}
s3ErrEntityTooSmall = s3APIError{
Status: http.StatusBadRequest,
Code: "EntityTooSmall",
Message: "Your proposed upload is smaller than the minimum allowed object size.",
}
s3ErrInternal = s3APIError{
Status: http.StatusInternalServerError,
Code: "InternalError",
Message: "We encountered an internal error. Please try again.",
}
)
func mapToS3Error(err error) s3APIError {
switch {
case errors.Is(err, metadata.ErrInvalidBucketName):
return s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidBucketName",
Message: "The specified bucket is not valid.",
}
case errors.Is(err, metadata.ErrBucketAlreadyExists):
return s3APIError{
Status: http.StatusConflict,
Code: "BucketAlreadyOwnedByYou",
Message: "Your previous request to create the named bucket succeeded and you already own it.",
}
case errors.Is(err, metadata.ErrBucketNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchBucket",
Message: "The specified bucket does not exist.",
}
case errors.Is(err, metadata.ErrBucketNotEmpty):
return s3APIError{
Status: http.StatusConflict,
Code: "BucketNotEmpty",
Message: "The bucket you tried to delete is not empty.",
}
case errors.Is(err, metadata.ErrObjectNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchKey",
Message: "The specified key does not exist.",
}
case errors.Is(err, metadata.ErrMultipartNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchUpload",
Message: "The specified multipart upload does not exist.",
}
case errors.Is(err, metadata.ErrMultipartNotPending):
return s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidRequest",
Message: "The multipart upload is not in a valid state for this operation.",
}
case errors.Is(err, service.ErrInvalidPart):
return s3ErrInvalidPart
case errors.Is(err, service.ErrInvalidPartOrder):
return s3ErrInvalidPartOrder
case errors.Is(err, service.ErrInvalidCompleteRequest):
return s3ErrMalformedXML
case errors.Is(err, service.ErrEntityTooSmall):
return s3ErrEntityTooSmall
default:
return s3ErrInternal
}
}
func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) {
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(apiErr.Status)
if r != nil && r.Method == http.MethodHead {
return
}
payload := models.S3ErrorResponse{
Code: apiErr.Code,
Message: apiErr.Message,
Resource: resource,
}
out, err := xml.MarshalIndent(payload, "", " ")
if err != nil {
return
}
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(out)
}
func writeMappedS3Error(w http.ResponseWriter, r *http.Request, err error) {
writeS3Error(w, r, mapToS3Error(err), r.URL.Path)
}

10
go.mod
View File

@@ -3,8 +3,12 @@ module fs
go 1.25.7
require (
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/klauspost/reedsolomon v1.13.2 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
github.com/go-chi/chi/v5 v5.2.5
github.com/google/uuid v1.6.0
go.etcd.io/bbolt v1.4.3
)
require (
github.com/joho/godotenv v1.5.1 // indirect
golang.org/x/sys v0.41.0 // indirect
)

22
go.sum
View File

@@ -1,10 +1,20 @@
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
github.com/klauspost/reedsolomon v1.13.2/go.mod h1:ggJT9lc71Vu+cSOPBlxGvBN6TfAS77qB4fp8vJ05NSA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

150
logging/logging.go Normal file
View File

@@ -0,0 +1,150 @@
package logging
import (
"log/slog"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5/middleware"
)
type Config struct {
Level slog.Level
LevelName string
Format string
Audit bool
AddSource bool
DebugMode bool
}
func ConfigFromEnv() Config {
levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL")))
format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT")))
return ConfigFromValues(levelName, format, envBool("AUDIT_LOG", true))
}
func ConfigFromValues(levelName, format string, audit bool) Config {
levelName = strings.ToLower(strings.TrimSpace(levelName))
if levelName == "" {
levelName = "info"
}
level := parseLevel(levelName)
levelName = strings.ToUpper(level.String())
format = strings.ToLower(strings.TrimSpace(format))
if format == "" {
format = "text"
}
if format != "json" && format != "text" {
format = "text"
}
debugMode := level <= slog.LevelDebug
return Config{
Level: level,
LevelName: levelName,
Format: format,
Audit: audit,
AddSource: debugMode,
DebugMode: debugMode,
}
}
func NewLogger(cfg Config) *slog.Logger {
opts := &slog.HandlerOptions{
Level: cfg.Level,
AddSource: cfg.AddSource,
}
opts.ReplaceAttr = func(_ []string, attr slog.Attr) slog.Attr {
if attr.Key == slog.SourceKey {
if src, ok := attr.Value.Any().(*slog.Source); ok && src != nil {
attr.Key = "src"
attr.Value = slog.StringValue(filepath.Base(src.File) + ":" + strconv.Itoa(src.Line))
}
}
return attr
}
var handler slog.Handler
if cfg.Format == "json" {
handler = slog.NewJSONHandler(os.Stdout, opts)
} else {
handler = slog.NewTextHandler(os.Stdout, opts)
}
logger := slog.New(handler)
slog.SetDefault(logger)
return logger
}
func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
next.ServeHTTP(ww, r)
if !cfg.Audit && !cfg.DebugMode {
return
}
elapsed := time.Since(start)
status := ww.Status()
if status == 0 {
status = http.StatusOK
}
attrs := []any{
"method", r.Method,
"path", r.URL.Path,
"status", status,
"bytes", ww.BytesWritten(),
"duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0,
"remote_addr", r.RemoteAddr,
}
if cfg.DebugMode {
attrs = append(attrs,
"query", r.URL.RawQuery,
"user_agent", r.UserAgent(),
"content_length", r.ContentLength,
"content_type", r.Header.Get("Content-Type"),
"x_amz_sha256", r.Header.Get("x-amz-content-sha256"),
)
logger.Debug("http_request", attrs...)
return
}
logger.Info("http_request", attrs...)
})
}
}
func envBool(key string, defaultValue bool) bool {
raw := os.Getenv(key)
if raw == "" {
return defaultValue
}
value, err := strconv.ParseBool(raw)
if err != nil {
return defaultValue
}
return value
}
func parseLevel(levelName string) slog.Level {
switch levelName {
case "debug":
return slog.LevelDebug
case "warn", "warning":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}

76
main.go
View File

@@ -1,54 +1,62 @@
package main
import (
"fmt"
"context"
"fs/api"
"fs/logging"
"fs/metadata"
"fs/service"
"io"
"fs/storage"
"fs/utils"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
)
func main() {
fmt.Println("Hello, World!")
imageStream, err := os.Open("fer.jpg")
if err != nil {
fmt.Printf("Error opening image stream: %v\n", err)
return
}
defer imageStream.Close()
config := utils.NewConfig()
logConfig := logging.ConfigFromValues(config.LogLevel, config.LogFormat, config.AuditLog)
logger := logging.NewLogger(logConfig)
logger.Info("boot",
"log_level", logConfig.LevelName,
"log_format", logConfig.Format,
"audit_log", logConfig.Audit,
"data_path", config.DataPath,
)
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
if err != nil {
fmt.Printf("Error initializing metadata handler: %v\n", err)
if err := os.MkdirAll(config.DataPath, 0o755); err != nil {
logger.Error("failed_to_prepare_data_path", "path", config.DataPath, "error", err)
return
}
objectService := service.NewObjectService(metadataHandler)
dbPath := filepath.Join(config.DataPath, "metadata.db")
metadataHandler, err := metadata.NewMetadataHandler(dbPath)
if err != nil {
logger.Error("failed_to_initialize_metadata_handler", "error", err)
return
}
blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize)
if err != nil {
_ = metadataHandler.Close()
logger.Error("failed_to_initialize_blob_store", "error", err)
return
}
manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream)
if err != nil {
fmt.Printf("Error ingesting stream: %v\n", err)
return
}
fmt.Printf("Manifest: %+v\n", manifest)
objectService := service.NewObjectService(metadataHandler, blobHandler)
handler := api.NewHandler(objectService, logger, logConfig)
addr := config.Address + ":" + strconv.Itoa(config.Port)
objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg")
if err != nil {
fmt.Printf("Error retrieving object: %v\n", err)
return
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if config.GcEnabled {
go objectService.RunGC(ctx, config.GcInterval)
}
fmt.Printf("Retrieved manifest: %+v\n", manifest2)
recoveredFile, err := os.Create("recovered_" + manifest2.Key)
if err != nil {
fmt.Printf("Error creating file: %v\n", err)
return
}
defer recoveredFile.Close()
bytesWritten, err := io.Copy(recoveredFile, objectData)
if err != nil {
fmt.Printf("Error streaming to recovered file: %v\n", err)
if err = handler.Start(ctx, addr); err != nil {
logger.Error("server_stopped_with_error", "error", err)
return
}
fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten)
}

View File

@@ -2,37 +2,221 @@ package metadata
import (
"encoding/json"
"errors"
"fmt"
"fs/models"
"regexp"
"sort"
"strings"
"time"
"github.com/google/uuid"
"go.etcd.io/bbolt"
)
const ManifestBucketName = "object_manifests"
type MetadataHandler struct {
db *bbolt.DB
}
var systemIndex = []byte("__SYSTEM_BUCKETS__")
var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__")
var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__")
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`)
var (
ErrInvalidBucketName = errors.New("invalid bucket name")
ErrBucketAlreadyExists = errors.New("bucket already exists")
ErrBucketNotFound = errors.New("bucket not found")
ErrBucketNotEmpty = errors.New("bucket not empty")
ErrObjectNotFound = errors.New("object not found")
ErrMultipartNotFound = errors.New("multipart upload not found")
ErrMultipartNotPending = errors.New("multipart upload is not pending")
)
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
db, err := bbolt.Open(dbPath, 0600, nil)
db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 2 * time.Second})
if err != nil {
return nil, err
}
return &MetadataHandler{db: db}, nil
h := &MetadataHandler{db: db}
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(systemIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
return h, nil
}
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
func (h *MetadataHandler) Close() error {
return h.db.Close()
}
func (h *MetadataHandler) CreateBucket(bucketName string) error {
if !validBucketName.MatchString(bucketName) {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName))
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
if err != nil {
return err
}
key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key)
if indexBucket.Get([]byte(bucketName)) != nil {
return fmt.Errorf("%w: %s", ErrBucketAlreadyExists, bucketName)
}
_, err = tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
manifest := models.BucketManifest{
Name: bucketName,
CreatedAt: time.Now(),
}
data, _ := json.Marshal(manifest)
return indexBucket.Put([]byte(bucketName), data)
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) DeleteBucket(bucketName string) error {
if !validBucketName.MatchString(bucketName) {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
if err != nil {
return err
}
if indexBucket.Get([]byte(bucketName)) == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
}
metadataBucket := tx.Bucket([]byte(bucketName))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
}
if k, _ := metadataBucket.Cursor().First(); k != nil {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
multipartUploadsBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return err
}
cursor := multipartUploadsBucket.Cursor()
for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() {
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return err
}
if upload.Bucket == bucketName && upload.State == "pending" {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
}
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
}
if err := indexBucket.Delete([]byte(bucketName)); err != nil {
return fmt.Errorf("error deleting bucket %s from system index: %w", bucketName, err)
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) ListBuckets() ([]string, error) {
buckets := []string{}
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
c := systemIndexBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
buckets = append(buckets, string(k))
}
return nil
})
if err != nil {
return nil, err
}
return buckets, nil
}
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
var manifest *models.BucketManifest
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
data := systemIndexBucket.Get([]byte(bucketName))
if data == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
}
err := json.Unmarshal(data, &manifest)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return manifest, nil
}
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
bucket := manifest.Bucket
key := manifest.Key
if _, err := h.GetBucketManifest(bucket); err != nil {
return err
}
err := h.db.Update(func(tx *bbolt.Tx) error {
data, err := json.Marshal(manifest)
if err != nil {
return err
}
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
return metadataBucket.Put([]byte(key), data)
})
if err != nil {
@@ -44,16 +228,15 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
var manifest *models.ObjectManifest
h.db.View(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
err := h.db.View(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("bucket %s not found", ManifestBucketName)
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
key := fmt.Sprintf("%s/%s", bucket, key)
data := metadataBucket.Get([]byte(key))
if data == nil {
return fmt.Errorf("manifest not found for bucket %s and key %s", bucket, key)
return fmt.Errorf("%w: %s/%s", ErrObjectNotFound, bucket, key)
}
err := json.Unmarshal(data, &manifest)
if err != nil {
@@ -61,6 +244,455 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
}
return nil
})
if err != nil {
return nil, err
}
return manifest, nil
}
func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
var objects []*models.ObjectManifest
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
if systemIndexBucket.Get([]byte(bucket)) == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
_bucket := tx.Bucket([]byte(bucket))
if _bucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
err := _bucket.ForEach(func(k, v []byte) error {
if prefix != "" && !strings.HasPrefix(string(k), prefix) {
return nil
}
object := models.ObjectManifest{}
err := json.Unmarshal(v, &object)
if err != nil {
return err
}
objects = append(objects, &object)
return nil
})
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return objects, nil
}
func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
if _, err := h.GetManifest(bucket, key); err != nil {
return err
}
err := h.db.Update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
return metadataBucket.Delete([]byte(key))
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
deleted := make([]string, 0, len(keys))
err := h.db.Update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
for _, key := range keys {
if key == "" {
continue
}
if metadataBucket.Get([]byte(key)) != nil {
if err := metadataBucket.Delete([]byte(key)); err != nil {
return err
}
}
deleted = append(deleted, key)
}
return nil
})
if err != nil {
return nil, err
}
return deleted, nil
}
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
if systemIndexBucket.Get([]byte(bucket)) != nil {
return nil
}
return ErrBucketNotFound
})
if err != nil {
return nil, err
}
uploadId := uuid.New().String()
createdAt := time.Now().UTC().Format(time.RFC3339)
upload = &models.MultipartUpload{
Bucket: bucket,
Key: key,
UploadID: uploadId,
CreatedAt: createdAt,
State: "pending",
}
err = h.db.Update(func(tx *bbolt.Tx) error {
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
if multipartUploadBucket == nil {
return errors.New("multipart upload index not found")
}
payload, err := json.Marshal(upload)
if err != nil {
return err
}
err = multipartUploadBucket.Put([]byte(uploadId), payload)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return upload, nil
}
func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
multipartUploadBucket := tx.Bucket(multipartUploadIndex)
if multipartUploadBucket == nil {
return nil, errors.New("multipart upload index not found")
}
return multipartUploadBucket, nil
}
func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex)
if multipartPartsBucket == nil {
return nil, errors.New("multipart upload parts index not found")
}
return multipartPartsBucket, nil
}
func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) {
payload := multipartUploadBucket.Get([]byte(uploadID))
if payload == nil {
return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID)
}
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return nil, err
}
return &upload, nil
}
func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) {
multipartUploadBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return nil, nil, err
}
upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID)
if err != nil {
return nil, nil, err
}
return upload, multipartUploadBucket, nil
}
func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error {
payload, err := json.Marshal(upload)
if err != nil {
return err
}
return multipartUploadBucket.Put([]byte(uploadID), payload)
}
func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
prefix := uploadID + ":"
cursor := multipartPartsBucket.Cursor()
keysToDelete := make([][]byte, 0)
for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() {
keyCopy := make([]byte, len(k))
copy(keyCopy, k)
keysToDelete = append(keysToDelete, keyCopy)
}
for _, key := range keysToDelete {
if err := multipartPartsBucket.Delete(key); err != nil {
return err
}
}
return nil
}
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.db.View(func(tx *bbolt.Tx) error {
var err error
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return upload, nil
}
func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error {
if part.PartNumber < 1 || part.PartNumber > 10000 {
return fmt.Errorf("invalid part number: %d", part.PartNumber)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State != "pending" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber)
payload, err := json.Marshal(part)
if err != nil {
return err
}
return multipartPartsBucket.Put([]byte(key), payload)
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
parts := make([]models.UploadedPart, 0)
err := h.db.View(func(tx *bbolt.Tx) error {
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
return err
}
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
prefix := uploadID + ":"
cursor := multipartPartsBucket.Cursor()
for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() {
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
parts = append(parts, part)
}
return nil
})
if err != nil {
return nil, err
}
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
return parts, nil
}
func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error {
if final == nil {
return errors.New("final object manifest is required")
}
err := h.db.Update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State != "pending" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
metadataBucket := tx.Bucket([]byte(upload.Bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket)
}
final.Bucket = upload.Bucket
final.Key = upload.Key
finalPayload, err := json.Marshal(final)
if err != nil {
return err
}
if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil {
return err
}
upload.State = "completed"
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State == "completed" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
upload.State = "aborted"
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
chunkSet := make(map[string]struct{})
pendingUploadSet := make(map[string]struct{})
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
c := systemIndexBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
metadataBucket := tx.Bucket(k)
if metadataBucket == nil {
continue
}
err := metadataBucket.ForEach(func(k, v []byte) error {
object := models.ObjectManifest{}
err := json.Unmarshal(v, &object)
if err != nil {
return err
}
for _, chunkID := range object.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
})
if err != nil {
return err
}
}
uploadsBucket := tx.Bucket(multipartUploadIndex)
if uploadsBucket == nil {
return errors.New("multipart upload index not found")
}
if err := uploadsBucket.ForEach(func(k, v []byte) error {
upload := models.MultipartUpload{}
if err := json.Unmarshal(v, &upload); err != nil {
return err
}
if upload.State == "pending" {
pendingUploadSet[string(k)] = struct{}{}
}
return nil
}); err != nil {
return err
}
partsBucket := tx.Bucket(multipartUploadPartsIndex)
if partsBucket == nil {
return errors.New("multipart upload parts index not found")
}
if err := partsBucket.ForEach(func(k, v []byte) error {
uploadID, _, ok := strings.Cut(string(k), ":")
if !ok {
return nil
}
if _, pending := pendingUploadSet[uploadID]; !pending {
return nil
}
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
for _, chunkID := range part.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
}); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return chunkSet, nil
}
func (h *MetadataHandler) GetReferencedChunks() ([]string, error) {
chunkSet, err := h.GetReferencedChunkSet()
if err != nil {
return nil, err
}
chunks := make([]string, 0, len(chunkSet))
for chunkID := range chunkSet {
chunks = append(chunks, chunkID)
}
return chunks, nil
}

View File

@@ -1,5 +1,10 @@
package models
import (
"encoding/xml"
"time"
)
type ObjectManifest struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
@@ -9,3 +14,172 @@ type ObjectManifest struct {
Chunks []string `json:"chunks"`
CreatedAt int64 `json:"created_at"`
}
type BucketManifest struct {
Name string `json:"name"`
CreatedAt time.Time `json:"created_at"`
OwnerID string `json:"owner_id"`
OwnerDisplayName string `json:"owner_display_name"`
Region string `json:"region"`
VersioningStatus string `json:"versioning_status"`
PublicAccessBlock bool `json:"public_access_block"`
}
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
Xmlns string `xml:"xmlns,attr"`
Owner BucketsOwner `xml:"Owner"`
Buckets BucketsElement `xml:"Buckets"`
}
type BucketsOwner struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName,omitempty"`
}
type BucketsElement struct {
Items []BucketItem `xml:"Bucket"`
}
type BucketItem struct {
Name string `xml:"Name"`
CreationDate string `xml:"CreationDate"`
}
type S3ErrorResponse struct {
XMLName xml.Name `xml:"Error"`
Code string `xml:"Code"`
Message string `xml:"Message"`
Resource string `xml:"Resource,omitempty"`
RequestID string `xml:"RequestId,omitempty"`
HostID string `xml:"HostId,omitempty"`
}
type ListBucketResult struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
KeyCount int `xml:"KeyCount"`
MaxKeys int `xml:"MaxKeys"`
IsTruncated bool `xml:"IsTruncated"`
Contents []Contents `xml:"Contents"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type ListBucketResultV2 struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Delimiter string `xml:"Delimiter,omitempty"`
MaxKeys int `xml:"MaxKeys"`
KeyCount int `xml:"KeyCount"`
IsTruncated bool `xml:"IsTruncated"`
ContinuationToken string `xml:"ContinuationToken,omitempty"`
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
StartAfter string `xml:"StartAfter,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
Contents []Contents `xml:"Contents,omitempty"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type Contents struct {
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
StorageClass string `xml:"StorageClass"`
}
type CommonPrefixes struct {
Prefix string `xml:"Prefix"`
}
type MultipartUpload struct {
UploadID string `json:"upload_id" xml:"UploadId"`
Bucket string `json:"bucket" xml:"Bucket"`
Key string `json:"key" xml:"Key"`
CreatedAt string `json:"created_at" xml:"CreatedAt"`
State string `json:"state" xml:"State"`
}
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
}
type UploadedPart struct {
PartNumber int `json:"part_number" xml:"PartNumber"`
ETag string `json:"etag" xml:"ETag"`
Size int64 `json:"size" xml:"Size"`
Chunks []string `json:"chunks"`
CreatedAt int64 `json:"created_at"`
}
type CompletedPart struct {
PartNumber int `xml:"PartNumber"`
ETag string `xml:"ETag"`
}
type CompleteMultipartUploadRequest struct {
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []CompletedPart `xml:"Part"`
}
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
ETag string `xml:"ETag"`
Location string `xml:"Location,omitempty"`
}
type ListPartsResult struct {
XMLName xml.Name `xml:"ListPartsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
Parts []PartItem `xml:"Part"`
}
type PartItem struct {
PartNumber int `xml:"PartNumber"`
LastModified string `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
}
type DeleteObjectsRequest struct {
XMLName xml.Name `xml:"Delete"`
Objects []DeleteObjectIdentity `xml:"Object"`
Quiet bool `xml:"Quiet"`
}
type DeleteObjectIdentity struct {
Key string `xml:"Key"`
}
type DeleteObjectsResult struct {
XMLName xml.Name `xml:"DeleteResult"`
Xmlns string `xml:"xmlns,attr"`
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
Errors []DeleteError `xml:"Error,omitempty"`
}
type DeletedEntry struct {
Key string `xml:"Key"`
}
type DeleteError struct {
Key string `xml:"Key"`
Code string `xml:"Code"`
Message string `xml:"Message"`
}

View File

@@ -1,29 +1,43 @@
package service
import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"fs/metadata"
"fs/models"
"fs/storage"
"io"
"log/slog"
"strings"
"sync"
"time"
)
type ObjectService struct {
metadataHandler *metadata.MetadataHandler
metadata *metadata.MetadataHandler
blob *storage.BlobStore
gcMu sync.RWMutex
}
func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService {
return &ObjectService{metadataHandler: metadataHandler}
var (
ErrInvalidPart = errors.New("invalid multipart part")
ErrInvalidPartOrder = errors.New("invalid multipart part order")
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
ErrEntityTooSmall = errors.New("multipart entity too small")
)
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore) *ObjectService {
return &ObjectService{metadata: metadataHandler, blob: blobHandler}
}
func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) {
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
bucket := strings.Split(uri, "/")[0]
key := strings.Join(strings.Split(uri, "/")[1:], "/")
chunks, size, etag, err := storage.IngestStream(input)
chunks, size, etag, err := s.blob.IngestStream(input)
if err != nil {
return nil, err
}
@@ -38,8 +52,14 @@ func (s *ObjectService) PutObject(uri string, contentType string, input io.Reade
Chunks: chunks,
CreatedAt: timestamp,
}
fmt.Println(manifest)
if err = s.metadataHandler.PutManifest(manifest); err != nil {
slog.Debug("object_written_manifest",
"bucket", manifest.Bucket,
"key", manifest.Key,
"size", manifest.Size,
"chunk_count", len(manifest.Chunks),
"etag", manifest.ETag,
)
if err = s.metadata.PutManifest(manifest); err != nil {
return nil, err
}
@@ -47,19 +67,298 @@ func (s *ObjectService) PutObject(uri string, contentType string, input io.Reade
}
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
manifest, err := s.metadataHandler.GetManifest(bucket, key)
s.gcMu.RLock()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
s.gcMu.RUnlock()
return nil, nil, err
}
pr, pw := io.Pipe()
go func() {
defer pw.Close()
err := storage.AssembleStream(manifest.Chunks, pw)
if err != nil {
defer s.gcMu.RUnlock()
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
}()
return pr, manifest, nil
}
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
return models.ObjectManifest{}, err
}
return *manifest, nil
}
func (s *ObjectService) DeleteObject(bucket, key string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifest(bucket, key)
}
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.ListObjects(bucket, prefix)
}
func (s *ObjectService) CreateBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateBucket(bucket)
}
func (s *ObjectService) HeadBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
_, err := s.metadata.GetBucketManifest(bucket)
return err
}
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.GetBucketManifest(bucket)
}
func (s *ObjectService) DeleteBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteBucket(bucket)
}
func (s *ObjectService) ListBuckets() ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.ListBuckets()
}
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifests(bucket, keys)
}
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateMultipartUpload(bucket, key)
}
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
if partNumber < 1 || partNumber > 10000 {
return "", ErrInvalidPart
}
upload, err := s.metadata.GetMultipartUpload(uploadId)
if err != nil {
return "", err
}
if upload.Bucket != bucket || upload.Key != key {
return "", metadata.ErrMultipartNotFound
}
var uploadedPart models.UploadedPart
chunkIds, totalSize, etag, err := s.blob.IngestStream(input)
if err != nil {
return "", err
}
uploadedPart = models.UploadedPart{
PartNumber: partNumber,
ETag: etag,
Size: totalSize,
Chunks: chunkIds,
CreatedAt: time.Now().Unix(),
}
err = s.metadata.PutMultipartPart(uploadId, uploadedPart)
if err != nil {
return "", err
}
return etag, nil
}
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
}
if upload.Bucket != bucket || upload.Key != key {
return nil, metadata.ErrMultipartNotFound
}
return s.metadata.ListMultipartParts(uploadID)
}
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
if len(completed) == 0 {
return nil, ErrInvalidCompleteRequest
}
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
}
if upload.Bucket != bucket || upload.Key != key {
return nil, metadata.ErrMultipartNotFound
}
storedParts, err := s.metadata.ListMultipartParts(uploadID)
if err != nil {
return nil, err
}
partsByNumber := make(map[int]models.UploadedPart, len(storedParts))
for _, part := range storedParts {
partsByNumber[part.PartNumber] = part
}
lastPartNumber := 0
orderedParts := make([]models.UploadedPart, 0, len(completed))
chunks := make([]string, 0)
var totalSize int64
for i, part := range completed {
if part.PartNumber <= lastPartNumber {
return nil, ErrInvalidPartOrder
}
lastPartNumber = part.PartNumber
storedPart, ok := partsByNumber[part.PartNumber]
if !ok {
return nil, ErrInvalidPart
}
if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) {
return nil, ErrInvalidPart
}
if i < len(completed)-1 && storedPart.Size < 5*1024*1024 {
return nil, ErrEntityTooSmall
}
orderedParts = append(orderedParts, storedPart)
chunks = append(chunks, storedPart.Chunks...)
totalSize += storedPart.Size
}
finalETag := buildMultipartETag(orderedParts)
manifest := &models.ObjectManifest{
Bucket: bucket,
Key: key,
Size: totalSize,
ContentType: "application/octet-stream",
ETag: finalETag,
Chunks: chunks,
CreatedAt: time.Now().Unix(),
}
if err := s.metadata.CompleteMultipartUpload(uploadID, manifest); err != nil {
return nil, err
}
return manifest, nil
}
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return err
}
if upload.Bucket != bucket || upload.Key != key {
return metadata.ErrMultipartNotFound
}
return s.metadata.AbortMultipartUpload(uploadID)
}
func normalizeETag(etag string) string {
return strings.Trim(etag, "\"")
}
func buildMultipartETag(parts []models.UploadedPart) string {
hasher := md5.New()
for _, part := range parts {
etagBytes, err := hex.DecodeString(normalizeETag(part.ETag))
if err == nil {
_, _ = hasher.Write(etagBytes)
continue
}
_, _ = hasher.Write([]byte(normalizeETag(part.ETag)))
}
return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts))
}
func (s *ObjectService) Close() error {
return s.metadata.Close()
}
func (s *ObjectService) GarbageCollect() error {
s.gcMu.Lock()
defer s.gcMu.Unlock()
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
if err != nil {
return err
}
totalChunks := 0
deletedChunks := 0
deleteErrors := 0
if err := s.blob.ForEachChunk(func(chunkID string) error {
totalChunks++
if _, found := referencedChunkSet[chunkID]; found {
return nil
}
if err := s.blob.DeleteBlob(chunkID); err != nil {
deleteErrors++
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
return nil
}
deletedChunks++
return nil
}); err != nil {
return err
}
slog.Info("garbage_collect_completed",
"referenced_chunks", len(referencedChunkSet),
"total_chunks", totalChunks,
"deleted_chunks", deletedChunks,
"delete_errors", deleteErrors,
)
return nil
}
func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = s.GarbageCollect()
}
}
}

View File

@@ -4,24 +4,48 @@ import (
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
)
const chunkSize = 64 * 1024
const blobRoot = "blobs/"
const blobRoot = "blobs"
const maxChunkSize = 64 * 1024 * 1024
func IngestStream(stream io.Reader) ([]string, int64, string, error) {
type BlobStore struct {
dataRoot string
chunkSize int
}
func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
root = strings.TrimSpace(root)
if root == "" {
return nil, errors.New("blob root is required")
}
if chunkSize <= 0 || chunkSize > maxChunkSize {
return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize)
}
cleanRoot := filepath.Clean(root)
if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil {
return nil, err
}
return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil
}
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
fullFileHasher := md5.New()
buffer := make([]byte, chunkSize)
buffer := make([]byte, bs.chunkSize)
var totalSize int64
var chunkIDs []string
for {
bytesRead, err := io.ReadFull(stream, buffer)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, 0, "", err
}
@@ -34,13 +58,13 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
chunkHash := sha256.Sum256(chunkData)
chunkID := hex.EncodeToString(chunkHash[:])
err := saveBlob(chunkID, chunkData)
err := bs.saveBlob(chunkID, chunkData)
if err != nil {
return nil, 0, "", err
}
chunkIDs = append(chunkIDs, chunkID)
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
break
}
if err != nil {
@@ -48,29 +72,68 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
}
}
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
return chunkIDs, totalSize, etag, nil
}
func saveBlob(chunkID string, data []byte) error {
dir := filepath.Join(blobRoot, chunkID[:2], chunkID[2:4])
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4])
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
fullPath := filepath.Join(dir, chunkID)
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
if err := os.WriteFile(fullPath, data, 0644); err != nil {
return err
if _, err := os.Stat(fullPath); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err
}
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
if err != nil {
return err
}
tmpPath := tmpFile.Name()
cleanup := true
defer func() {
if cleanup {
_ = os.Remove(tmpPath)
}
}()
if _, err := tmpFile.Write(data); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Sync(); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Close(); err != nil {
return err
}
if err := os.Rename(tmpPath, fullPath); err != nil {
if _, statErr := os.Stat(fullPath); statErr == nil {
return nil
}
return err
}
cleanup = false
if err := syncDir(dir); err != nil {
return err
}
return nil
}
func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
for _, chunkID := range chunkIDs {
chunkData, err := GetBlob(chunkID)
chunkData, err := bs.GetBlob(chunkID)
if err != nil {
return err
}
@@ -81,7 +144,69 @@ func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
return nil
}
func GetBlob(chunkID string) ([]byte, error) {
return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID))
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
if !isValidChunkID(chunkID) {
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
}
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
}
func (bs *BlobStore) DeleteBlob(chunkID string) error {
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
if err != nil && os.IsNotExist(err) {
return nil
}
return err
}
func (bs *BlobStore) ListChunks() ([]string, error) {
var chunkIDs []string
err := bs.ForEachChunk(func(chunkID string) error {
chunkIDs = append(chunkIDs, chunkID)
return nil
})
return chunkIDs, err
}
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
if fn == nil {
return errors.New("chunk callback is required")
}
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
chunkID := info.Name()
if isValidChunkID(chunkID) {
return fn(chunkID)
}
}
return nil
})
}
func isValidChunkID(chunkID string) bool {
if len(chunkID) != sha256.Size*2 {
return false
}
for _, ch := range chunkID {
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') {
return false
}
}
return true
}
func syncDir(dirPath string) error {
dir, err := os.Open(dirPath)
if err != nil {
return err
}
defer dir.Close()
return dir.Sync()
}

94
utils/config.go Normal file
View File

@@ -0,0 +1,94 @@
package utils
import (
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/joho/godotenv"
)
type Config struct {
DataPath string
Address string
Port int
ChunkSize int
LogLevel string
LogFormat string
AuditLog bool
GcInterval time.Duration
GcEnabled bool
}
func NewConfig() *Config {
_ = godotenv.Load()
config := &Config{
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
Port: envIntRange("PORT", 3000, 1, 65535),
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true),
GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, -1, 60)) * time.Minute,
GcEnabled: envBool("GC_ENABLED", true),
}
if config.LogFormat != "json" && config.LogFormat != "text" {
config.LogFormat = "text"
}
return config
}
func envIntRange(key string, defaultValue, minValue, maxValue int) int {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.Atoi(raw)
if err != nil {
return defaultValue
}
if value < minValue || value > maxValue {
return defaultValue
}
return value
}
func envBool(key string, defaultValue bool) bool {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.ParseBool(raw)
if err != nil {
return defaultValue
}
return value
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if v != "" {
return v
}
}
return ""
}
func sanitizeDataPath(raw string) string {
cleaned := strings.TrimSpace(raw)
if cleaned == "" {
cleaned = "."
}
cleaned = filepath.Clean(cleaned)
if abs, err := filepath.Abs(cleaned); err == nil {
return abs
}
return cleaned
}

54
utils/utils.go Normal file
View File

@@ -0,0 +1,54 @@
package utils
import (
"encoding/xml"
"fs/models"
"sort"
"strings"
"time"
)
func ConstructXMLResponseForObjectList(bucket string, objects []*models.ObjectManifest) (string, error) {
result := models.ListBucketResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: "",
KeyCount: len(objects),
MaxKeys: 1000,
IsTruncated: false,
}
prefixSet := make(map[string]struct{})
for _, object := range objects {
result.Contents = append(result.Contents, models.Contents{
Key: object.Key,
LastModified: time.Unix(object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: "\"" + object.ETag + "\"",
Size: object.Size,
StorageClass: "STANDARD",
})
if strings.Contains(object.Key, "/") {
parts := strings.SplitN(object.Key, "/", 2)
prefixSet[parts[0]+"/"] = struct{}{}
}
}
prefixes := make([]string, 0, len(prefixSet))
for prefix := range prefixSet {
prefixes = append(prefixes, prefix)
}
sort.Strings(prefixes)
for _, prefix := range prefixes {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{Prefix: prefix})
}
output, err := xml.MarshalIndent(result, "", " ")
if err != nil {
return "", err
}
return xml.Header + string(output), nil
}