Merge branch 'develop' into LICENSE

This commit is contained in:
2026-02-24 10:19:41 +01:00
committed by GitHub
10 changed files with 674 additions and 60 deletions

View File

@@ -4,3 +4,5 @@ DATA_PATH=data/
PORT=2600 PORT=2600
AUDIT_LOG=true AUDIT_LOG=true
ADDRESS=0.0.0.0 ADDRESS=0.0.0.0
GC_INTERVAL=10
GC_ENABLED=true

View File

@@ -29,7 +29,6 @@ An experimental Object Storage written in Go that should be partially compatible
- No authentication/authorization yet. - No authentication/authorization yet.
- Not full S3 API coverage. - Not full S3 API coverage.
- No garbage collection of unreferenced blob chunks yet.
- No versioning or lifecycle policies. - No versioning or lifecycle policies.
- Error and edge-case behavior is still being refined for client compatibility. - Error and edge-case behavior is still being refined for client compatibility.

View File

@@ -3,6 +3,7 @@ package api
import ( import (
"bufio" "bufio"
"context" "context"
"encoding/base64"
"encoding/xml" "encoding/xml"
"errors" "errors"
"fmt" "fmt"
@@ -10,15 +11,13 @@ import (
"fs/metadata" "fs/metadata"
"fs/models" "fs/models"
"fs/service" "fs/service"
"fs/utils"
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "net/url"
"os/signal" "sort"
"strconv" "strconv"
"strings" "strings"
"syscall"
"time" "time"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@@ -100,6 +99,34 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
} }
defer stream.Close() defer stream.Close()
rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
if rangeHeader != "" {
start, end, err := parseSingleByteRange(rangeHeader, manifest.Size)
if err != nil {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size))
writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path)
return
}
if start > 0 {
if _, err := io.CopyN(io.Discard, stream, start); err != nil {
writeMappedS3Error(w, r, err)
return
}
}
length := end - start + 1
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
_, _ = io.CopyN(w, stream, length)
return
}
w.Header().Set("Content-Type", manifest.ContentType) w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10)) w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
w.Header().Set("ETag", `"`+manifest.ETag+`"`) w.Header().Set("ETag", `"`+manifest.ETag+`"`)
@@ -418,8 +445,16 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
} }
keys := make([]string, 0, len(req.Objects)) keys := make([]string, 0, len(req.Objects))
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
for _, obj := range req.Objects { for _, obj := range req.Objects {
if obj.Key == "" { if obj.Key == "" {
response.Errors = append(response.Errors, models.DeleteError{
Key: obj.Key,
Code: s3ErrInvalidObjectKey.Code,
Message: s3ErrInvalidObjectKey.Message,
})
continue continue
} }
keys = append(keys, obj.Key) keys = append(keys, obj.Key)
@@ -431,9 +466,6 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
return return
} }
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
if !req.Quiet { if !req.Quiet {
response.Deleted = make([]models.DeletedEntry, 0, len(deleted)) response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
for _, key := range deleted { for _, key := range deleted {
@@ -496,25 +528,47 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
writeMappedS3Error(w, r, err) writeMappedS3Error(w, r, err)
return return
} }
w.Header().Set("Content-Type", "application/xml")
w.WriteHeader(http.StatusOK) response := models.ListAllMyBucketsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Owner: models.BucketsOwner{
ID: "local",
DisplayName: "local",
},
Buckets: models.BucketsElement{
Items: make([]models.BucketItem, 0, len(buckets)),
},
}
for _, bucket := range buckets { for _, bucket := range buckets {
_, err := w.Write([]byte(bucket)) manifest, err := h.svc.GetBucketManifest(bucket)
if err != nil { if err != nil {
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
continue
}
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
Name: bucket,
CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"),
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return return
} }
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
} }
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket") bucket := chi.URLParam(r, "bucket")
if r.URL.Query().Get("list-type") == "2" { if r.URL.Query().Get("list-type") == "2" {
prefix := r.URL.Query().Get("prefix") h.handleListObjectsV2(w, r, bucket)
if prefix == "" {
prefix = ""
}
h.handleListObjectsV2(w, r, bucket, prefix)
return return
} }
if r.URL.Query().Has("location") { if r.URL.Query().Has("location") {
@@ -534,30 +588,224 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
} }
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket, prefix string) { func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
prefix := r.URL.Query().Get("prefix")
delimiter := r.URL.Query().Get("delimiter")
startAfter := r.URL.Query().Get("start-after")
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
if encodingType != "" && encodingType != "url" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
maxKeys := 1000
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
parsed, err := strconv.Atoi(rawMaxKeys)
if err != nil || parsed < 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if parsed > 1000 {
parsed = 1000
}
maxKeys = parsed
}
continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token"))
continuationMarker := ""
if continuationToken != "" {
decoded, err := base64.StdEncoding.DecodeString(continuationToken)
if err != nil || len(decoded) == 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
continuationMarker = string(decoded)
}
objects, err := h.svc.ListObjects(bucket, prefix) objects, err := h.svc.ListObjects(bucket, prefix)
if err != nil { if err != nil {
writeMappedS3Error(w, r, err) writeMappedS3Error(w, r, err)
return return
} }
xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects) entries := buildListV2Entries(objects, prefix, delimiter)
startIdx := 0
if continuationMarker != "" {
found := false
for i, entry := range entries {
if entry.Marker == continuationMarker {
startIdx = i + 1
found = true
break
}
}
if !found {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
} else if startAfter != "" {
for startIdx < len(entries) && entries[startIdx].SortKey <= startAfter {
startIdx++
}
}
result := models.ListBucketResultV2{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: s3EncodeIfNeeded(prefix, encodingType),
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
MaxKeys: maxKeys,
ContinuationToken: continuationToken,
StartAfter: s3EncodeIfNeeded(startAfter, encodingType),
EncodingType: encodingType,
}
endIdx := startIdx
for endIdx < len(entries) && result.KeyCount < maxKeys {
entry := entries[endIdx]
if entry.Object != nil {
result.Contents = append(result.Contents, models.Contents{
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + entry.Object.ETag + `"`,
Size: entry.Object.Size,
StorageClass: "STANDARD",
})
} else {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
})
}
result.KeyCount++
endIdx++
}
result.IsTruncated = endIdx < len(entries)
if result.IsTruncated && result.KeyCount > 0 {
result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[endIdx-1].Marker))
}
xmlResponse, err := xml.MarshalIndent(result, "", " ")
if err != nil { if err != nil {
writeMappedS3Error(w, r, err) writeMappedS3Error(w, r, err)
return return
} }
w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte(xmlResponse)) _, _ = w.Write([]byte(xml.Header))
if err != nil { _, _ = w.Write(xmlResponse)
return
}
} }
func (h *Handler) Start(address string) error { type listV2Entry struct {
Marker string
SortKey string
Object *models.ObjectManifest
CommonPrefix string
}
func buildListV2Entries(objects []*models.ObjectManifest, prefix, delimiter string) []listV2Entry {
sorted := make([]*models.ObjectManifest, 0, len(objects))
sorted = append(sorted, objects...)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Key < sorted[j].Key
})
entries := make([]listV2Entry, 0, len(sorted))
seenCommonPrefixes := make(map[string]struct{})
for _, object := range sorted {
if object == nil {
continue
}
if delimiter != "" {
relative := strings.TrimPrefix(object.Key, prefix)
if idx := strings.Index(relative, delimiter); idx >= 0 {
commonPrefix := prefix + relative[:idx+len(delimiter)]
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
continue
}
seenCommonPrefixes[commonPrefix] = struct{}{}
entries = append(entries, listV2Entry{
Marker: "C:" + commonPrefix,
SortKey: commonPrefix,
CommonPrefix: commonPrefix,
})
continue
}
}
entries = append(entries, listV2Entry{
Marker: "K:" + object.Key,
SortKey: object.Key,
Object: object,
})
}
return entries
}
func s3EncodeIfNeeded(value, encodingType string) string {
if encodingType != "url" || value == "" {
return value
}
encoded := url.QueryEscape(value)
return strings.ReplaceAll(encoded, "+", "%20")
}
func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) {
if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") {
return 0, 0, errors.New("invalid range")
}
spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes="))
if spec == "" || strings.Contains(spec, ",") {
return 0, 0, errors.New("invalid range")
}
parts := strings.SplitN(spec, "-", 2)
if len(parts) != 2 {
return 0, 0, errors.New("invalid range")
}
if parts[0] == "" {
suffixLength, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil || suffixLength <= 0 {
return 0, 0, errors.New("invalid range")
}
if suffixLength > size {
suffixLength = size
}
start := size - suffixLength
end := size - 1
return start, end, nil
}
start, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil || start < 0 || start >= size {
return 0, 0, errors.New("invalid range")
}
var end int64
if parts[1] == "" {
end = size - 1
} else {
end, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil || end < start {
return 0, 0, errors.New("invalid range")
}
if end >= size {
end = size - 1
}
}
return start, end, nil
}
func (h *Handler) Start(ctx context.Context, address string) error {
if ctx == nil {
ctx = context.Background()
}
h.logger.Info("server_starting", h.logger.Info("server_starting",
"address", address, "address", address,
"log_format", h.logConfig.Format, "log_format", h.logConfig.Format,
@@ -565,9 +813,7 @@ func (h *Handler) Start(address string) error {
"audit_log", h.logConfig.Audit, "audit_log", h.logConfig.Audit,
) )
h.setupRoutes() h.setupRoutes()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(stop)
server := http.Server{ server := http.Server{
Addr: address, Addr: address,
Handler: h.router, Handler: h.router,
@@ -583,10 +829,13 @@ func (h *Handler) Start(address string) error {
}() }()
select { select {
case <-stop: case <-ctx.Done():
h.logger.Info("shutdown_signal_received") h.logger.Info("shutdown_context_done", "reason", ctx.Err())
case err := <-errCh: case err := <-errCh:
h.logger.Error("server_listen_failed", "error", err) h.logger.Error("server_listen_failed", "error", err)
if closeErr := h.svc.Close(); closeErr != nil {
h.logger.Error("service_close_failed", "error", closeErr)
}
return err return err
} }

View File

@@ -41,6 +41,16 @@ var (
Code: "MalformedXML", Code: "MalformedXML",
Message: "The XML you provided was not well-formed or did not validate against our published schema.", Message: "The XML you provided was not well-formed or did not validate against our published schema.",
} }
s3ErrInvalidArgument = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidArgument",
Message: "Invalid argument.",
}
s3ErrInvalidRange = s3APIError{
Status: http.StatusRequestedRangeNotSatisfiable,
Code: "InvalidRange",
Message: "The requested range is not satisfiable.",
}
s3ErrEntityTooSmall = s3APIError{ s3ErrEntityTooSmall = s3APIError{
Status: http.StatusBadRequest, Status: http.StatusBadRequest,
Code: "EntityTooSmall", Code: "EntityTooSmall",

13
main.go
View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fs/api" "fs/api"
"fs/logging" "fs/logging"
"fs/metadata" "fs/metadata"
@@ -8,8 +9,10 @@ import (
"fs/storage" "fs/storage"
"fs/utils" "fs/utils"
"os" "os"
"os/signal"
"path/filepath" "path/filepath"
"strconv" "strconv"
"syscall"
) )
func main() { func main() {
@@ -44,8 +47,16 @@ func main() {
objectService := service.NewObjectService(metadataHandler, blobHandler) objectService := service.NewObjectService(metadataHandler, blobHandler)
handler := api.NewHandler(objectService, logger, logConfig) handler := api.NewHandler(objectService, logger, logConfig)
addr := config.Address + ":" + strconv.Itoa(config.Port) addr := config.Address + ":" + strconv.Itoa(config.Port)
if err = handler.Start(addr); err != nil {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if config.GcEnabled {
go objectService.RunGC(ctx, config.GcInterval)
}
if err = handler.Start(ctx, addr); err != nil {
logger.Error("server_stopped_with_error", "error", err) logger.Error("server_stopped_with_error", "error", err)
return return
} }
} }

View File

@@ -126,6 +126,22 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
if k, _ := metadataBucket.Cursor().First(); k != nil { if k, _ := metadataBucket.Cursor().First(); k != nil {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName) return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
} }
multipartUploadsBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return err
}
cursor := multipartUploadsBucket.Cursor()
for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() {
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return err
}
if upload.Bucket == bucketName && upload.State == "pending" {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
}
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err) return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
} }
@@ -585,3 +601,98 @@ func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
} }
return nil return nil
} }
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
chunkSet := make(map[string]struct{})
pendingUploadSet := make(map[string]struct{})
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
c := systemIndexBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
metadataBucket := tx.Bucket(k)
if metadataBucket == nil {
continue
}
err := metadataBucket.ForEach(func(k, v []byte) error {
object := models.ObjectManifest{}
err := json.Unmarshal(v, &object)
if err != nil {
return err
}
for _, chunkID := range object.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
})
if err != nil {
return err
}
}
uploadsBucket := tx.Bucket(multipartUploadIndex)
if uploadsBucket == nil {
return errors.New("multipart upload index not found")
}
if err := uploadsBucket.ForEach(func(k, v []byte) error {
upload := models.MultipartUpload{}
if err := json.Unmarshal(v, &upload); err != nil {
return err
}
if upload.State == "pending" {
pendingUploadSet[string(k)] = struct{}{}
}
return nil
}); err != nil {
return err
}
partsBucket := tx.Bucket(multipartUploadPartsIndex)
if partsBucket == nil {
return errors.New("multipart upload parts index not found")
}
if err := partsBucket.ForEach(func(k, v []byte) error {
uploadID, _, ok := strings.Cut(string(k), ":")
if !ok {
return nil
}
if _, pending := pendingUploadSet[uploadID]; !pending {
return nil
}
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
for _, chunkID := range part.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
}); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return chunkSet, nil
}
func (h *MetadataHandler) GetReferencedChunks() ([]string, error) {
chunkSet, err := h.GetReferencedChunkSet()
if err != nil {
return nil, err
}
chunks := make([]string, 0, len(chunkSet))
for chunkID := range chunkSet {
chunks = append(chunks, chunkID)
}
return chunks, nil
}

View File

@@ -24,6 +24,27 @@ type BucketManifest struct {
PublicAccessBlock bool `json:"public_access_block"` PublicAccessBlock bool `json:"public_access_block"`
} }
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
Xmlns string `xml:"xmlns,attr"`
Owner BucketsOwner `xml:"Owner"`
Buckets BucketsElement `xml:"Buckets"`
}
type BucketsOwner struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName,omitempty"`
}
type BucketsElement struct {
Items []BucketItem `xml:"Bucket"`
}
type BucketItem struct {
Name string `xml:"Name"`
CreationDate string `xml:"CreationDate"`
}
type S3ErrorResponse struct { type S3ErrorResponse struct {
XMLName xml.Name `xml:"Error"` XMLName xml.Name `xml:"Error"`
Code string `xml:"Code"` Code string `xml:"Code"`
@@ -47,6 +68,25 @@ type ListBucketResult struct {
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"` CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
} }
type ListBucketResultV2 struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Delimiter string `xml:"Delimiter,omitempty"`
MaxKeys int `xml:"MaxKeys"`
KeyCount int `xml:"KeyCount"`
IsTruncated bool `xml:"IsTruncated"`
ContinuationToken string `xml:"ContinuationToken,omitempty"`
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
StartAfter string `xml:"StartAfter,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
Contents []Contents `xml:"Contents,omitempty"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type Contents struct { type Contents struct {
Key string `xml:"Key"` Key string `xml:"Key"`
LastModified string `xml:"LastModified"` LastModified string `xml:"LastModified"`
@@ -131,8 +171,15 @@ type DeleteObjectsResult struct {
XMLName xml.Name `xml:"DeleteResult"` XMLName xml.Name `xml:"DeleteResult"`
Xmlns string `xml:"xmlns,attr"` Xmlns string `xml:"xmlns,attr"`
Deleted []DeletedEntry `xml:"Deleted,omitempty"` Deleted []DeletedEntry `xml:"Deleted,omitempty"`
Errors []DeleteError `xml:"Error,omitempty"`
} }
type DeletedEntry struct { type DeletedEntry struct {
Key string `xml:"Key"` Key string `xml:"Key"`
} }
type DeleteError struct {
Key string `xml:"Key"`
Code string `xml:"Code"`
Message string `xml:"Message"`
}

View File

@@ -1,6 +1,7 @@
package service package service
import ( import (
"context"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"errors" "errors"
@@ -11,12 +12,14 @@ import (
"io" "io"
"log/slog" "log/slog"
"strings" "strings"
"sync"
"time" "time"
) )
type ObjectService struct { type ObjectService struct {
metadata *metadata.MetadataHandler metadata *metadata.MetadataHandler
blob *storage.BlobStore blob *storage.BlobStore
gcMu sync.RWMutex
} }
var ( var (
@@ -31,6 +34,8 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st
} }
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) { func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
chunks, size, etag, err := s.blob.IngestStream(input) chunks, size, etag, err := s.blob.IngestStream(input)
if err != nil { if err != nil {
@@ -62,29 +67,30 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
} }
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
s.gcMu.RLock()
manifest, err := s.metadata.GetManifest(bucket, key) manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil { if err != nil {
s.gcMu.RUnlock()
return nil, nil, err return nil, nil, err
} }
pr, pw := io.Pipe() pr, pw := io.Pipe()
go func() { go func() {
defer func(pw *io.PipeWriter) { defer s.gcMu.RUnlock()
err := pw.Close() if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
if err != nil { _ = pw.CloseWithError(err)
}
}(pw)
err := s.blob.AssembleStream(manifest.Chunks, pw)
if err != nil {
return return
} }
_ = pw.Close()
}() }()
return pr, manifest, nil return pr, manifest, nil
} }
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) { func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
manifest, err := s.metadata.GetManifest(bucket, key) manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil { if err != nil {
return models.ObjectManifest{}, err return models.ObjectManifest{}, err
@@ -93,39 +99,68 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
} }
func (s *ObjectService) DeleteObject(bucket, key string) error { func (s *ObjectService) DeleteObject(bucket, key string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifest(bucket, key) return s.metadata.DeleteManifest(bucket, key)
} }
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) { func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.ListObjects(bucket, prefix) return s.metadata.ListObjects(bucket, prefix)
} }
func (s *ObjectService) CreateBucket(bucket string) error { func (s *ObjectService) CreateBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateBucket(bucket) return s.metadata.CreateBucket(bucket)
} }
func (s *ObjectService) HeadBucket(bucket string) error { func (s *ObjectService) HeadBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
_, err := s.metadata.GetBucketManifest(bucket) _, err := s.metadata.GetBucketManifest(bucket)
return err return err
} }
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.GetBucketManifest(bucket)
}
func (s *ObjectService) DeleteBucket(bucket string) error { func (s *ObjectService) DeleteBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteBucket(bucket) return s.metadata.DeleteBucket(bucket)
} }
func (s *ObjectService) ListBuckets() ([]string, error) { func (s *ObjectService) ListBuckets() ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.ListBuckets() return s.metadata.ListBuckets()
} }
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifests(bucket, keys) return s.metadata.DeleteManifests(bucket, keys)
} }
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateMultipartUpload(bucket, key) return s.metadata.CreateMultipartUpload(bucket, key)
} }
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
if partNumber < 1 || partNumber > 10000 { if partNumber < 1 || partNumber > 10000 {
return "", ErrInvalidPart return "", ErrInvalidPart
} }
@@ -158,6 +193,9 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
} }
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
upload, err := s.metadata.GetMultipartUpload(uploadID) upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -169,6 +207,9 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode
} }
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
if len(completed) == 0 { if len(completed) == 0 {
return nil, ErrInvalidCompleteRequest return nil, ErrInvalidCompleteRequest
} }
@@ -236,6 +277,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
} }
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
upload, err := s.metadata.GetMultipartUpload(uploadID) upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil { if err != nil {
return err return err
@@ -266,3 +310,55 @@ func buildMultipartETag(parts []models.UploadedPart) string {
func (s *ObjectService) Close() error { func (s *ObjectService) Close() error {
return s.metadata.Close() return s.metadata.Close()
} }
func (s *ObjectService) GarbageCollect() error {
s.gcMu.Lock()
defer s.gcMu.Unlock()
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
if err != nil {
return err
}
totalChunks := 0
deletedChunks := 0
deleteErrors := 0
if err := s.blob.ForEachChunk(func(chunkID string) error {
totalChunks++
if _, found := referencedChunkSet[chunkID]; found {
return nil
}
if err := s.blob.DeleteBlob(chunkID); err != nil {
deleteErrors++
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
return nil
}
deletedChunks++
return nil
}); err != nil {
return err
}
slog.Info("garbage_collect_completed",
"referenced_chunks", len(referencedChunkSet),
"total_chunks", totalChunks,
"deleted_chunks", deletedChunks,
"delete_errors", deleteErrors,
)
return nil
}
func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = s.GarbageCollect()
}
}
}

View File

@@ -87,10 +87,46 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
} }
fullPath := filepath.Join(dir, chunkID) fullPath := filepath.Join(dir, chunkID)
if _, err := os.Stat(fullPath); os.IsNotExist(err) { if _, err := os.Stat(fullPath); err == nil {
if err := os.WriteFile(fullPath, data, 0644); err != nil { return nil
} else if !os.IsNotExist(err) {
return err return err
} }
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
if err != nil {
return err
}
tmpPath := tmpFile.Name()
cleanup := true
defer func() {
if cleanup {
_ = os.Remove(tmpPath)
}
}()
if _, err := tmpFile.Write(data); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Sync(); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Close(); err != nil {
return err
}
if err := os.Rename(tmpPath, fullPath); err != nil {
if _, statErr := os.Stat(fullPath); statErr == nil {
return nil
}
return err
}
cleanup = false
if err := syncDir(dir); err != nil {
return err
} }
return nil return nil
} }
@@ -115,6 +151,45 @@ func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
} }
func (bs *BlobStore) DeleteBlob(chunkID string) error {
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
if err != nil && os.IsNotExist(err) {
return nil
}
return err
}
func (bs *BlobStore) ListChunks() ([]string, error) {
var chunkIDs []string
err := bs.ForEachChunk(func(chunkID string) error {
chunkIDs = append(chunkIDs, chunkID)
return nil
})
return chunkIDs, err
}
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
if fn == nil {
return errors.New("chunk callback is required")
}
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
chunkID := info.Name()
if isValidChunkID(chunkID) {
return fn(chunkID)
}
}
return nil
})
}
func isValidChunkID(chunkID string) bool { func isValidChunkID(chunkID string) bool {
if len(chunkID) != sha256.Size*2 { if len(chunkID) != sha256.Size*2 {
return false return false
@@ -126,3 +201,12 @@ func isValidChunkID(chunkID string) bool {
} }
return true return true
} }
func syncDir(dirPath string) error {
dir, err := os.Open(dirPath)
if err != nil {
return err
}
defer dir.Close()
return dir.Sync()
}

View File

@@ -5,6 +5,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/joho/godotenv" "github.com/joho/godotenv"
) )
@@ -17,6 +18,8 @@ type Config struct {
LogLevel string LogLevel string
LogFormat string LogFormat string
AuditLog bool AuditLog bool
GcInterval time.Duration
GcEnabled bool
} }
func NewConfig() *Config { func NewConfig() *Config {
@@ -30,6 +33,8 @@ func NewConfig() *Config {
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true), AuditLog: envBool("AUDIT_LOG", true),
GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, -1, 60)) * time.Minute,
GcEnabled: envBool("GC_ENABLED", true),
} }
if config.LogFormat != "json" && config.LogFormat != "text" { if config.LogFormat != "json" && config.LogFormat != "text" {