Merge pull request #2 from ferdzo/feature/garbage-collection

Garbage collection and few other things
This commit is contained in:
2026-02-24 10:18:11 +01:00
committed by GitHub
10 changed files with 675 additions and 61 deletions

View File

@@ -4,3 +4,5 @@ DATA_PATH=data/
PORT=2600
AUDIT_LOG=true
ADDRESS=0.0.0.0
GC_INTERVAL=10
GC_ENABLED=true

View File

@@ -29,6 +29,5 @@ An experimental Object Storage written in Go that should be partially compatible
- No authentication/authorization yet.
- Not full S3 API coverage.
- No garbage collection of unreferenced blob chunks yet.
- No versioning or lifecycle policies.
- Error and edge-case behavior is still being refined for client compatibility.
- Error and edge-case behavior is still being refined for client compatibility.

View File

@@ -3,6 +3,7 @@ package api
import (
"bufio"
"context"
"encoding/base64"
"encoding/xml"
"errors"
"fmt"
@@ -10,15 +11,13 @@ import (
"fs/metadata"
"fs/models"
"fs/service"
"fs/utils"
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"net/url"
"sort"
"strconv"
"strings"
"syscall"
"time"
"github.com/go-chi/chi/v5"
@@ -100,6 +99,34 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
}
defer stream.Close()
rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
if rangeHeader != "" {
start, end, err := parseSingleByteRange(rangeHeader, manifest.Size)
if err != nil {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size))
writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path)
return
}
if start > 0 {
if _, err := io.CopyN(io.Discard, stream, start); err != nil {
writeMappedS3Error(w, r, err)
return
}
}
length := end - start + 1
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
_, _ = io.CopyN(w, stream, length)
return
}
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
@@ -418,8 +445,16 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
}
keys := make([]string, 0, len(req.Objects))
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
for _, obj := range req.Objects {
if obj.Key == "" {
response.Errors = append(response.Errors, models.DeleteError{
Key: obj.Key,
Code: s3ErrInvalidObjectKey.Code,
Message: s3ErrInvalidObjectKey.Message,
})
continue
}
keys = append(keys, obj.Key)
@@ -431,9 +466,6 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
return
}
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
if !req.Quiet {
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
for _, key := range deleted {
@@ -496,25 +528,47 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml")
w.WriteHeader(http.StatusOK)
for _, bucket := range buckets {
_, err := w.Write([]byte(bucket))
if err != nil {
return
}
response := models.ListAllMyBucketsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Owner: models.BucketsOwner{
ID: "local",
DisplayName: "local",
},
Buckets: models.BucketsElement{
Items: make([]models.BucketItem, 0, len(buckets)),
},
}
for _, bucket := range buckets {
manifest, err := h.svc.GetBucketManifest(bucket)
if err != nil {
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
continue
}
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
Name: bucket,
CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"),
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if r.URL.Query().Get("list-type") == "2" {
prefix := r.URL.Query().Get("prefix")
if prefix == "" {
prefix = ""
}
h.handleListObjectsV2(w, r, bucket, prefix)
h.handleListObjectsV2(w, r, bucket)
return
}
if r.URL.Query().Has("location") {
@@ -534,30 +588,224 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
}
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket, prefix string) {
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
prefix := r.URL.Query().Get("prefix")
delimiter := r.URL.Query().Get("delimiter")
startAfter := r.URL.Query().Get("start-after")
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
if encodingType != "" && encodingType != "url" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
maxKeys := 1000
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
parsed, err := strconv.Atoi(rawMaxKeys)
if err != nil || parsed < 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if parsed > 1000 {
parsed = 1000
}
maxKeys = parsed
}
continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token"))
continuationMarker := ""
if continuationToken != "" {
decoded, err := base64.StdEncoding.DecodeString(continuationToken)
if err != nil || len(decoded) == 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
continuationMarker = string(decoded)
}
objects, err := h.svc.ListObjects(bucket, prefix)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects)
entries := buildListV2Entries(objects, prefix, delimiter)
startIdx := 0
if continuationMarker != "" {
found := false
for i, entry := range entries {
if entry.Marker == continuationMarker {
startIdx = i + 1
found = true
break
}
}
if !found {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
} else if startAfter != "" {
for startIdx < len(entries) && entries[startIdx].SortKey <= startAfter {
startIdx++
}
}
result := models.ListBucketResultV2{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: s3EncodeIfNeeded(prefix, encodingType),
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
MaxKeys: maxKeys,
ContinuationToken: continuationToken,
StartAfter: s3EncodeIfNeeded(startAfter, encodingType),
EncodingType: encodingType,
}
endIdx := startIdx
for endIdx < len(entries) && result.KeyCount < maxKeys {
entry := entries[endIdx]
if entry.Object != nil {
result.Contents = append(result.Contents, models.Contents{
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + entry.Object.ETag + `"`,
Size: entry.Object.Size,
StorageClass: "STANDARD",
})
} else {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
})
}
result.KeyCount++
endIdx++
}
result.IsTruncated = endIdx < len(entries)
if result.IsTruncated && result.KeyCount > 0 {
result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[endIdx-1].Marker))
}
xmlResponse, err := xml.MarshalIndent(result, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte(xmlResponse))
if err != nil {
return
}
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(xmlResponse)
}
func (h *Handler) Start(address string) error {
type listV2Entry struct {
Marker string
SortKey string
Object *models.ObjectManifest
CommonPrefix string
}
func buildListV2Entries(objects []*models.ObjectManifest, prefix, delimiter string) []listV2Entry {
sorted := make([]*models.ObjectManifest, 0, len(objects))
sorted = append(sorted, objects...)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Key < sorted[j].Key
})
entries := make([]listV2Entry, 0, len(sorted))
seenCommonPrefixes := make(map[string]struct{})
for _, object := range sorted {
if object == nil {
continue
}
if delimiter != "" {
relative := strings.TrimPrefix(object.Key, prefix)
if idx := strings.Index(relative, delimiter); idx >= 0 {
commonPrefix := prefix + relative[:idx+len(delimiter)]
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
continue
}
seenCommonPrefixes[commonPrefix] = struct{}{}
entries = append(entries, listV2Entry{
Marker: "C:" + commonPrefix,
SortKey: commonPrefix,
CommonPrefix: commonPrefix,
})
continue
}
}
entries = append(entries, listV2Entry{
Marker: "K:" + object.Key,
SortKey: object.Key,
Object: object,
})
}
return entries
}
func s3EncodeIfNeeded(value, encodingType string) string {
if encodingType != "url" || value == "" {
return value
}
encoded := url.QueryEscape(value)
return strings.ReplaceAll(encoded, "+", "%20")
}
func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) {
if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") {
return 0, 0, errors.New("invalid range")
}
spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes="))
if spec == "" || strings.Contains(spec, ",") {
return 0, 0, errors.New("invalid range")
}
parts := strings.SplitN(spec, "-", 2)
if len(parts) != 2 {
return 0, 0, errors.New("invalid range")
}
if parts[0] == "" {
suffixLength, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil || suffixLength <= 0 {
return 0, 0, errors.New("invalid range")
}
if suffixLength > size {
suffixLength = size
}
start := size - suffixLength
end := size - 1
return start, end, nil
}
start, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil || start < 0 || start >= size {
return 0, 0, errors.New("invalid range")
}
var end int64
if parts[1] == "" {
end = size - 1
} else {
end, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil || end < start {
return 0, 0, errors.New("invalid range")
}
if end >= size {
end = size - 1
}
}
return start, end, nil
}
func (h *Handler) Start(ctx context.Context, address string) error {
if ctx == nil {
ctx = context.Background()
}
h.logger.Info("server_starting",
"address", address,
"log_format", h.logConfig.Format,
@@ -565,9 +813,7 @@ func (h *Handler) Start(address string) error {
"audit_log", h.logConfig.Audit,
)
h.setupRoutes()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(stop)
server := http.Server{
Addr: address,
Handler: h.router,
@@ -583,10 +829,13 @@ func (h *Handler) Start(address string) error {
}()
select {
case <-stop:
h.logger.Info("shutdown_signal_received")
case <-ctx.Done():
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
case err := <-errCh:
h.logger.Error("server_listen_failed", "error", err)
if closeErr := h.svc.Close(); closeErr != nil {
h.logger.Error("service_close_failed", "error", closeErr)
}
return err
}

View File

@@ -41,6 +41,16 @@ var (
Code: "MalformedXML",
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
}
s3ErrInvalidArgument = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidArgument",
Message: "Invalid argument.",
}
s3ErrInvalidRange = s3APIError{
Status: http.StatusRequestedRangeNotSatisfiable,
Code: "InvalidRange",
Message: "The requested range is not satisfiable.",
}
s3ErrEntityTooSmall = s3APIError{
Status: http.StatusBadRequest,
Code: "EntityTooSmall",

13
main.go
View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"fs/api"
"fs/logging"
"fs/metadata"
@@ -8,8 +9,10 @@ import (
"fs/storage"
"fs/utils"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
)
func main() {
@@ -44,8 +47,16 @@ func main() {
objectService := service.NewObjectService(metadataHandler, blobHandler)
handler := api.NewHandler(objectService, logger, logConfig)
addr := config.Address + ":" + strconv.Itoa(config.Port)
if err = handler.Start(addr); err != nil {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if config.GcEnabled {
go objectService.RunGC(ctx, config.GcInterval)
}
if err = handler.Start(ctx, addr); err != nil {
logger.Error("server_stopped_with_error", "error", err)
return
}
}

View File

@@ -126,6 +126,22 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
if k, _ := metadataBucket.Cursor().First(); k != nil {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
multipartUploadsBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return err
}
cursor := multipartUploadsBucket.Cursor()
for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() {
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return err
}
if upload.Bucket == bucketName && upload.State == "pending" {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
}
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
}
@@ -585,3 +601,98 @@ func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
}
return nil
}
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
chunkSet := make(map[string]struct{})
pendingUploadSet := make(map[string]struct{})
err := h.db.View(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
c := systemIndexBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
metadataBucket := tx.Bucket(k)
if metadataBucket == nil {
continue
}
err := metadataBucket.ForEach(func(k, v []byte) error {
object := models.ObjectManifest{}
err := json.Unmarshal(v, &object)
if err != nil {
return err
}
for _, chunkID := range object.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
})
if err != nil {
return err
}
}
uploadsBucket := tx.Bucket(multipartUploadIndex)
if uploadsBucket == nil {
return errors.New("multipart upload index not found")
}
if err := uploadsBucket.ForEach(func(k, v []byte) error {
upload := models.MultipartUpload{}
if err := json.Unmarshal(v, &upload); err != nil {
return err
}
if upload.State == "pending" {
pendingUploadSet[string(k)] = struct{}{}
}
return nil
}); err != nil {
return err
}
partsBucket := tx.Bucket(multipartUploadPartsIndex)
if partsBucket == nil {
return errors.New("multipart upload parts index not found")
}
if err := partsBucket.ForEach(func(k, v []byte) error {
uploadID, _, ok := strings.Cut(string(k), ":")
if !ok {
return nil
}
if _, pending := pendingUploadSet[uploadID]; !pending {
return nil
}
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
for _, chunkID := range part.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
}); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return chunkSet, nil
}
func (h *MetadataHandler) GetReferencedChunks() ([]string, error) {
chunkSet, err := h.GetReferencedChunkSet()
if err != nil {
return nil, err
}
chunks := make([]string, 0, len(chunkSet))
for chunkID := range chunkSet {
chunks = append(chunks, chunkID)
}
return chunks, nil
}

View File

@@ -24,6 +24,27 @@ type BucketManifest struct {
PublicAccessBlock bool `json:"public_access_block"`
}
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
Xmlns string `xml:"xmlns,attr"`
Owner BucketsOwner `xml:"Owner"`
Buckets BucketsElement `xml:"Buckets"`
}
type BucketsOwner struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName,omitempty"`
}
type BucketsElement struct {
Items []BucketItem `xml:"Bucket"`
}
type BucketItem struct {
Name string `xml:"Name"`
CreationDate string `xml:"CreationDate"`
}
type S3ErrorResponse struct {
XMLName xml.Name `xml:"Error"`
Code string `xml:"Code"`
@@ -47,6 +68,25 @@ type ListBucketResult struct {
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type ListBucketResultV2 struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Delimiter string `xml:"Delimiter,omitempty"`
MaxKeys int `xml:"MaxKeys"`
KeyCount int `xml:"KeyCount"`
IsTruncated bool `xml:"IsTruncated"`
ContinuationToken string `xml:"ContinuationToken,omitempty"`
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
StartAfter string `xml:"StartAfter,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
Contents []Contents `xml:"Contents,omitempty"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type Contents struct {
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
@@ -131,8 +171,15 @@ type DeleteObjectsResult struct {
XMLName xml.Name `xml:"DeleteResult"`
Xmlns string `xml:"xmlns,attr"`
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
Errors []DeleteError `xml:"Error,omitempty"`
}
type DeletedEntry struct {
Key string `xml:"Key"`
}
type DeleteError struct {
Key string `xml:"Key"`
Code string `xml:"Code"`
Message string `xml:"Message"`
}

View File

@@ -1,6 +1,7 @@
package service
import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
@@ -11,12 +12,14 @@ import (
"io"
"log/slog"
"strings"
"sync"
"time"
)
type ObjectService struct {
metadata *metadata.MetadataHandler
blob *storage.BlobStore
gcMu sync.RWMutex
}
var (
@@ -31,6 +34,8 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st
}
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
chunks, size, etag, err := s.blob.IngestStream(input)
if err != nil {
@@ -62,29 +67,30 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
}
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
s.gcMu.RLock()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
s.gcMu.RUnlock()
return nil, nil, err
}
pr, pw := io.Pipe()
go func() {
defer func(pw *io.PipeWriter) {
err := pw.Close()
if err != nil {
}
}(pw)
err := s.blob.AssembleStream(manifest.Chunks, pw)
if err != nil {
defer s.gcMu.RUnlock()
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
}()
return pr, manifest, nil
}
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
return models.ObjectManifest{}, err
@@ -93,39 +99,68 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
}
func (s *ObjectService) DeleteObject(bucket, key string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifest(bucket, key)
}
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.ListObjects(bucket, prefix)
}
func (s *ObjectService) CreateBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateBucket(bucket)
}
func (s *ObjectService) HeadBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
_, err := s.metadata.GetBucketManifest(bucket)
return err
}
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.GetBucketManifest(bucket)
}
func (s *ObjectService) DeleteBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteBucket(bucket)
}
func (s *ObjectService) ListBuckets() ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.ListBuckets()
}
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifests(bucket, keys)
}
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateMultipartUpload(bucket, key)
}
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
if partNumber < 1 || partNumber > 10000 {
return "", ErrInvalidPart
}
@@ -158,6 +193,9 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
}
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
@@ -169,6 +207,9 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode
}
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
if len(completed) == 0 {
return nil, ErrInvalidCompleteRequest
}
@@ -236,6 +277,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
}
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return err
@@ -266,3 +310,55 @@ func buildMultipartETag(parts []models.UploadedPart) string {
func (s *ObjectService) Close() error {
return s.metadata.Close()
}
func (s *ObjectService) GarbageCollect() error {
s.gcMu.Lock()
defer s.gcMu.Unlock()
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
if err != nil {
return err
}
totalChunks := 0
deletedChunks := 0
deleteErrors := 0
if err := s.blob.ForEachChunk(func(chunkID string) error {
totalChunks++
if _, found := referencedChunkSet[chunkID]; found {
return nil
}
if err := s.blob.DeleteBlob(chunkID); err != nil {
deleteErrors++
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
return nil
}
deletedChunks++
return nil
}); err != nil {
return err
}
slog.Info("garbage_collect_completed",
"referenced_chunks", len(referencedChunkSet),
"total_chunks", totalChunks,
"deleted_chunks", deletedChunks,
"delete_errors", deleteErrors,
)
return nil
}
func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = s.GarbageCollect()
}
}
}

View File

@@ -87,10 +87,46 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
}
fullPath := filepath.Join(dir, chunkID)
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
if err := os.WriteFile(fullPath, data, 0644); err != nil {
return err
if _, err := os.Stat(fullPath); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err
}
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
if err != nil {
return err
}
tmpPath := tmpFile.Name()
cleanup := true
defer func() {
if cleanup {
_ = os.Remove(tmpPath)
}
}()
if _, err := tmpFile.Write(data); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Sync(); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Close(); err != nil {
return err
}
if err := os.Rename(tmpPath, fullPath); err != nil {
if _, statErr := os.Stat(fullPath); statErr == nil {
return nil
}
return err
}
cleanup = false
if err := syncDir(dir); err != nil {
return err
}
return nil
}
@@ -115,6 +151,45 @@ func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
}
func (bs *BlobStore) DeleteBlob(chunkID string) error {
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
if err != nil && os.IsNotExist(err) {
return nil
}
return err
}
func (bs *BlobStore) ListChunks() ([]string, error) {
var chunkIDs []string
err := bs.ForEachChunk(func(chunkID string) error {
chunkIDs = append(chunkIDs, chunkID)
return nil
})
return chunkIDs, err
}
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
if fn == nil {
return errors.New("chunk callback is required")
}
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
chunkID := info.Name()
if isValidChunkID(chunkID) {
return fn(chunkID)
}
}
return nil
})
}
func isValidChunkID(chunkID string) bool {
if len(chunkID) != sha256.Size*2 {
return false
@@ -126,3 +201,12 @@ func isValidChunkID(chunkID string) bool {
}
return true
}
func syncDir(dirPath string) error {
dir, err := os.Open(dirPath)
if err != nil {
return err
}
defer dir.Close()
return dir.Sync()
}

View File

@@ -5,31 +5,36 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/joho/godotenv"
)
type Config struct {
DataPath string
Address string
Port int
ChunkSize int
LogLevel string
LogFormat string
AuditLog bool
DataPath string
Address string
Port int
ChunkSize int
LogLevel string
LogFormat string
AuditLog bool
GcInterval time.Duration
GcEnabled bool
}
func NewConfig() *Config {
_ = godotenv.Load()
config := &Config{
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
Port: envIntRange("PORT", 3000, 1, 65535),
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true),
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
Port: envIntRange("PORT", 3000, 1, 65535),
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true),
GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, -1, 60)) * time.Minute,
GcEnabled: envBool("GC_ENABLED", true),
}
if config.LogFormat != "json" && config.LogFormat != "text" {