diff --git a/.env.example b/.env.example index e6c7567..3b4db17 100644 --- a/.env.example +++ b/.env.example @@ -4,3 +4,5 @@ DATA_PATH=data/ PORT=2600 AUDIT_LOG=true ADDRESS=0.0.0.0 +GC_INTERVAL=10 +GC_ENABLED=true \ No newline at end of file diff --git a/README.md b/README.md index 6512bc1..7e0eb8e 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,5 @@ An experimental Object Storage written in Go that should be partially compatible - No authentication/authorization yet. - Not full S3 API coverage. -- No garbage collection of unreferenced blob chunks yet. - No versioning or lifecycle policies. -- Error and edge-case behavior is still being refined for client compatibility. \ No newline at end of file +- Error and edge-case behavior is still being refined for client compatibility. diff --git a/api/api.go b/api/api.go index ee70fc7..ef7b4e9 100644 --- a/api/api.go +++ b/api/api.go @@ -3,6 +3,7 @@ package api import ( "bufio" "context" + "encoding/base64" "encoding/xml" "errors" "fmt" @@ -10,15 +11,13 @@ import ( "fs/metadata" "fs/models" "fs/service" - "fs/utils" "io" "log/slog" "net/http" - "os" - "os/signal" + "net/url" + "sort" "strconv" "strings" - "syscall" "time" "github.com/go-chi/chi/v5" @@ -100,6 +99,34 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) { } defer stream.Close() + rangeHeader := strings.TrimSpace(r.Header.Get("Range")) + if rangeHeader != "" { + start, end, err := parseSingleByteRange(rangeHeader, manifest.Size) + if err != nil { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size)) + writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path) + return + } + + if start > 0 { + if _, err := io.CopyN(io.Discard, stream, start); err != nil { + writeMappedS3Error(w, r, err) + return + } + } + + length := end - start + 1 + w.Header().Set("Content-Type", manifest.ContentType) + w.Header().Set("Content-Length", strconv.FormatInt(length, 10)) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size)) + w.Header().Set("ETag", `"`+manifest.ETag+`"`) + w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusPartialContent) + _, _ = io.CopyN(w, stream, length) + return + } + w.Header().Set("Content-Type", manifest.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10)) w.Header().Set("ETag", `"`+manifest.ETag+`"`) @@ -418,8 +445,16 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { } keys := make([]string, 0, len(req.Objects)) + response := models.DeleteObjectsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + } for _, obj := range req.Objects { if obj.Key == "" { + response.Errors = append(response.Errors, models.DeleteError{ + Key: obj.Key, + Code: s3ErrInvalidObjectKey.Code, + Message: s3ErrInvalidObjectKey.Message, + }) continue } keys = append(keys, obj.Key) @@ -431,9 +466,6 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { return } - response := models.DeleteObjectsResult{ - Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", - } if !req.Quiet { response.Deleted = make([]models.DeletedEntry, 0, len(deleted)) for _, key := range deleted { @@ -496,25 +528,47 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) { writeMappedS3Error(w, r, err) return } - w.Header().Set("Content-Type", "application/xml") - w.WriteHeader(http.StatusOK) + + response := models.ListAllMyBucketsResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Owner: models.BucketsOwner{ + ID: "local", + DisplayName: "local", + }, + Buckets: models.BucketsElement{ + Items: make([]models.BucketItem, 0, len(buckets)), + }, + } + for _, bucket := range buckets { - _, err := w.Write([]byte(bucket)) + manifest, err := h.svc.GetBucketManifest(bucket) if err != nil { + writeMappedS3Error(w, r, err) return } + response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{ + Name: bucket, + CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"), + }) } + + payload, err := xml.MarshalIndent(response, "", " ") + if err != nil { + writeMappedS3Error(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(payload) } func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { bucket := chi.URLParam(r, "bucket") if r.URL.Query().Get("list-type") == "2" { - prefix := r.URL.Query().Get("prefix") - if prefix == "" { - prefix = "" - } - h.handleListObjectsV2(w, r, bucket, prefix) + h.handleListObjectsV2(w, r, bucket) return } if r.URL.Query().Has("location") { @@ -534,30 +588,224 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) { } -func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket, prefix string) { +func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) { + prefix := r.URL.Query().Get("prefix") + delimiter := r.URL.Query().Get("delimiter") + startAfter := r.URL.Query().Get("start-after") + encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type"))) + if encodingType != "" && encodingType != "url" { + writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path) + return + } + + maxKeys := 1000 + if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" { + parsed, err := strconv.Atoi(rawMaxKeys) + if err != nil || parsed < 0 { + writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path) + return + } + if parsed > 1000 { + parsed = 1000 + } + maxKeys = parsed + } + + continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token")) + continuationMarker := "" + if continuationToken != "" { + decoded, err := base64.StdEncoding.DecodeString(continuationToken) + if err != nil || len(decoded) == 0 { + writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path) + return + } + continuationMarker = string(decoded) + } + objects, err := h.svc.ListObjects(bucket, prefix) if err != nil { writeMappedS3Error(w, r, err) return } - xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects) + entries := buildListV2Entries(objects, prefix, delimiter) + startIdx := 0 + if continuationMarker != "" { + found := false + for i, entry := range entries { + if entry.Marker == continuationMarker { + startIdx = i + 1 + found = true + break + } + } + if !found { + writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path) + return + } + } else if startAfter != "" { + for startIdx < len(entries) && entries[startIdx].SortKey <= startAfter { + startIdx++ + } + } + + result := models.ListBucketResultV2{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Name: bucket, + Prefix: s3EncodeIfNeeded(prefix, encodingType), + Delimiter: s3EncodeIfNeeded(delimiter, encodingType), + MaxKeys: maxKeys, + ContinuationToken: continuationToken, + StartAfter: s3EncodeIfNeeded(startAfter, encodingType), + EncodingType: encodingType, + } + + endIdx := startIdx + for endIdx < len(entries) && result.KeyCount < maxKeys { + entry := entries[endIdx] + if entry.Object != nil { + result.Contents = append(result.Contents, models.Contents{ + Key: s3EncodeIfNeeded(entry.Object.Key, encodingType), + LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"), + ETag: `"` + entry.Object.ETag + `"`, + Size: entry.Object.Size, + StorageClass: "STANDARD", + }) + } else { + result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{ + Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType), + }) + } + result.KeyCount++ + endIdx++ + } + + result.IsTruncated = endIdx < len(entries) + if result.IsTruncated && result.KeyCount > 0 { + result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[endIdx-1].Marker)) + } + + xmlResponse, err := xml.MarshalIndent(result, "", " ") if err != nil { writeMappedS3Error(w, r, err) return } w.Header().Set("Content-Type", "application/xml; charset=utf-8") - w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse))) + w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse))) w.WriteHeader(http.StatusOK) - _, err = w.Write([]byte(xmlResponse)) - if err != nil { - return - } + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(xmlResponse) } -func (h *Handler) Start(address string) error { +type listV2Entry struct { + Marker string + SortKey string + Object *models.ObjectManifest + CommonPrefix string +} + +func buildListV2Entries(objects []*models.ObjectManifest, prefix, delimiter string) []listV2Entry { + sorted := make([]*models.ObjectManifest, 0, len(objects)) + sorted = append(sorted, objects...) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Key < sorted[j].Key + }) + + entries := make([]listV2Entry, 0, len(sorted)) + seenCommonPrefixes := make(map[string]struct{}) + for _, object := range sorted { + if object == nil { + continue + } + if delimiter != "" { + relative := strings.TrimPrefix(object.Key, prefix) + if idx := strings.Index(relative, delimiter); idx >= 0 { + commonPrefix := prefix + relative[:idx+len(delimiter)] + if _, exists := seenCommonPrefixes[commonPrefix]; exists { + continue + } + seenCommonPrefixes[commonPrefix] = struct{}{} + entries = append(entries, listV2Entry{ + Marker: "C:" + commonPrefix, + SortKey: commonPrefix, + CommonPrefix: commonPrefix, + }) + continue + } + } + + entries = append(entries, listV2Entry{ + Marker: "K:" + object.Key, + SortKey: object.Key, + Object: object, + }) + } + return entries +} + +func s3EncodeIfNeeded(value, encodingType string) string { + if encodingType != "url" || value == "" { + return value + } + encoded := url.QueryEscape(value) + return strings.ReplaceAll(encoded, "+", "%20") +} + +func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) { + if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") { + return 0, 0, errors.New("invalid range") + } + spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes=")) + if spec == "" || strings.Contains(spec, ",") { + return 0, 0, errors.New("invalid range") + } + + parts := strings.SplitN(spec, "-", 2) + if len(parts) != 2 { + return 0, 0, errors.New("invalid range") + } + + if parts[0] == "" { + suffixLength, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil || suffixLength <= 0 { + return 0, 0, errors.New("invalid range") + } + if suffixLength > size { + suffixLength = size + } + start := size - suffixLength + end := size - 1 + return start, end, nil + } + + start, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil || start < 0 || start >= size { + return 0, 0, errors.New("invalid range") + } + + var end int64 + if parts[1] == "" { + end = size - 1 + } else { + end, err = strconv.ParseInt(parts[1], 10, 64) + if err != nil || end < start { + return 0, 0, errors.New("invalid range") + } + if end >= size { + end = size - 1 + } + } + + return start, end, nil +} + +func (h *Handler) Start(ctx context.Context, address string) error { + if ctx == nil { + ctx = context.Background() + } + h.logger.Info("server_starting", "address", address, "log_format", h.logConfig.Format, @@ -565,9 +813,7 @@ func (h *Handler) Start(address string) error { "audit_log", h.logConfig.Audit, ) h.setupRoutes() - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(stop) + server := http.Server{ Addr: address, Handler: h.router, @@ -583,8 +829,8 @@ func (h *Handler) Start(address string) error { }() select { - case <-stop: - h.logger.Info("shutdown_signal_received") + case <-ctx.Done(): + h.logger.Info("shutdown_context_done", "reason", ctx.Err()) case err := <-errCh: h.logger.Error("server_listen_failed", "error", err) return err diff --git a/api/s3_errors.go b/api/s3_errors.go index c446e4d..f61afe1 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -41,6 +41,16 @@ var ( Code: "MalformedXML", Message: "The XML you provided was not well-formed or did not validate against our published schema.", } + s3ErrInvalidArgument = s3APIError{ + Status: http.StatusBadRequest, + Code: "InvalidArgument", + Message: "Invalid argument.", + } + s3ErrInvalidRange = s3APIError{ + Status: http.StatusRequestedRangeNotSatisfiable, + Code: "InvalidRange", + Message: "The requested range is not satisfiable.", + } s3ErrEntityTooSmall = s3APIError{ Status: http.StatusBadRequest, Code: "EntityTooSmall", diff --git a/main.go b/main.go index 53a28ed..e10a9bd 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fs/api" "fs/logging" "fs/metadata" @@ -8,8 +9,10 @@ import ( "fs/storage" "fs/utils" "os" + "os/signal" "path/filepath" "strconv" + "syscall" ) func main() { @@ -44,8 +47,16 @@ func main() { objectService := service.NewObjectService(metadataHandler, blobHandler) handler := api.NewHandler(objectService, logger, logConfig) addr := config.Address + ":" + strconv.Itoa(config.Port) - if err = handler.Start(addr); err != nil { + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + if config.GcEnabled { + go objectService.RunGC(ctx, config.GcInterval) + } + + if err = handler.Start(ctx, addr); err != nil { logger.Error("server_stopped_with_error", "error", err) return } + } diff --git a/metadata/metadata.go b/metadata/metadata.go index 6a2df20..08c276a 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -126,6 +126,22 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error { if k, _ := metadataBucket.Cursor().First(); k != nil { return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName) } + + multipartUploadsBucket, err := getMultipartUploadBucket(tx) + if err != nil { + return err + } + cursor := multipartUploadsBucket.Cursor() + for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() { + upload := models.MultipartUpload{} + if err := json.Unmarshal(payload, &upload); err != nil { + return err + } + if upload.Bucket == bucketName && upload.State == "pending" { + return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName) + } + } + if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err) } @@ -585,3 +601,72 @@ func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error { } return nil } + +func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) { + chunkSet := make(map[string]struct{}) + + err := h.db.View(func(tx *bbolt.Tx) error { + systemIndexBucket := tx.Bucket([]byte(systemIndex)) + if systemIndexBucket == nil { + return errors.New("system index not found") + } + c := systemIndexBucket.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + metadataBucket := tx.Bucket(k) + if metadataBucket == nil { + continue + } + err := metadataBucket.ForEach(func(k, v []byte) error { + object := models.ObjectManifest{} + err := json.Unmarshal(v, &object) + if err != nil { + return err + } + for _, chunkID := range object.Chunks { + chunkSet[chunkID] = struct{}{} + } + return nil + }) + if err != nil { + return err + } + } + + partsBucket := tx.Bucket(multipartUploadPartsIndex) + if partsBucket == nil { + return errors.New("multipart upload parts index not found") + } + if err := partsBucket.ForEach(func(_, v []byte) error { + part := models.UploadedPart{} + if err := json.Unmarshal(v, &part); err != nil { + return err + } + for _, chunkID := range part.Chunks { + chunkSet[chunkID] = struct{}{} + } + return nil + }); err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + + return chunkSet, nil +} + +func (h *MetadataHandler) GetReferencedChunks() ([]string, error) { + chunkSet, err := h.GetReferencedChunkSet() + if err != nil { + return nil, err + } + + chunks := make([]string, 0, len(chunkSet)) + for chunkID := range chunkSet { + chunks = append(chunks, chunkID) + } + return chunks, nil + +} diff --git a/models/models.go b/models/models.go index fe6d74f..df363e1 100644 --- a/models/models.go +++ b/models/models.go @@ -24,6 +24,27 @@ type BucketManifest struct { PublicAccessBlock bool `json:"public_access_block"` } +type ListAllMyBucketsResult struct { + XMLName xml.Name `xml:"ListAllMyBucketsResult"` + Xmlns string `xml:"xmlns,attr"` + Owner BucketsOwner `xml:"Owner"` + Buckets BucketsElement `xml:"Buckets"` +} + +type BucketsOwner struct { + ID string `xml:"ID"` + DisplayName string `xml:"DisplayName,omitempty"` +} + +type BucketsElement struct { + Items []BucketItem `xml:"Bucket"` +} + +type BucketItem struct { + Name string `xml:"Name"` + CreationDate string `xml:"CreationDate"` +} + type S3ErrorResponse struct { XMLName xml.Name `xml:"Error"` Code string `xml:"Code"` @@ -47,6 +68,25 @@ type ListBucketResult struct { CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"` } +type ListBucketResultV2 struct { + XMLName xml.Name `xml:"ListBucketResult"` + Xmlns string `xml:"xmlns,attr"` + + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + Delimiter string `xml:"Delimiter,omitempty"` + MaxKeys int `xml:"MaxKeys"` + KeyCount int `xml:"KeyCount"` + IsTruncated bool `xml:"IsTruncated"` + ContinuationToken string `xml:"ContinuationToken,omitempty"` + NextContinuationToken string `xml:"NextContinuationToken,omitempty"` + StartAfter string `xml:"StartAfter,omitempty"` + EncodingType string `xml:"EncodingType,omitempty"` + + Contents []Contents `xml:"Contents,omitempty"` + CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"` +} + type Contents struct { Key string `xml:"Key"` LastModified string `xml:"LastModified"` @@ -131,8 +171,15 @@ type DeleteObjectsResult struct { XMLName xml.Name `xml:"DeleteResult"` Xmlns string `xml:"xmlns,attr"` Deleted []DeletedEntry `xml:"Deleted,omitempty"` + Errors []DeleteError `xml:"Error,omitempty"` } type DeletedEntry struct { Key string `xml:"Key"` } + +type DeleteError struct { + Key string `xml:"Key"` + Code string `xml:"Code"` + Message string `xml:"Message"` +} diff --git a/service/service.go b/service/service.go index 560816f..2b9d193 100644 --- a/service/service.go +++ b/service/service.go @@ -1,6 +1,7 @@ package service import ( + "context" "crypto/md5" "encoding/hex" "errors" @@ -11,12 +12,14 @@ import ( "io" "log/slog" "strings" + "sync" "time" ) type ObjectService struct { metadata *metadata.MetadataHandler blob *storage.BlobStore + gcMu sync.RWMutex } var ( @@ -31,6 +34,8 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st } func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() chunks, size, etag, err := s.blob.IngestStream(input) if err != nil { @@ -69,17 +74,11 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob pr, pw := io.Pipe() go func() { - defer func(pw *io.PipeWriter) { - err := pw.Close() - if err != nil { - - } - }(pw) - - err := s.blob.AssembleStream(manifest.Chunks, pw) - if err != nil { + if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil { + _ = pw.CloseWithError(err) return } + _ = pw.Close() }() return pr, manifest, nil } @@ -93,6 +92,8 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e } func (s *ObjectService) DeleteObject(bucket, key string) error { + s.gcMu.RLock() + defer s.gcMu.RUnlock() return s.metadata.DeleteManifest(bucket, key) } @@ -101,6 +102,8 @@ func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectMani } func (s *ObjectService) CreateBucket(bucket string) error { + s.gcMu.RLock() + defer s.gcMu.RUnlock() return s.metadata.CreateBucket(bucket) } @@ -109,7 +112,13 @@ func (s *ObjectService) HeadBucket(bucket string) error { return err } +func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) { + return s.metadata.GetBucketManifest(bucket) +} + func (s *ObjectService) DeleteBucket(bucket string) error { + s.gcMu.RLock() + defer s.gcMu.RUnlock() return s.metadata.DeleteBucket(bucket) } @@ -118,14 +127,21 @@ func (s *ObjectService) ListBuckets() ([]string, error) { } func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() return s.metadata.DeleteManifests(bucket, keys) } func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() return s.metadata.CreateMultipartUpload(bucket, key) } func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + if partNumber < 1 || partNumber > 10000 { return "", ErrInvalidPart } @@ -169,6 +185,9 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode } func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + if len(completed) == 0 { return nil, ErrInvalidCompleteRequest } @@ -236,6 +255,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co } func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { return err @@ -266,3 +288,51 @@ func buildMultipartETag(parts []models.UploadedPart) string { func (s *ObjectService) Close() error { return s.metadata.Close() } + +func (s *ObjectService) GarbageCollect() error { + s.gcMu.Lock() + defer s.gcMu.Unlock() + + referencedChunkSet, err := s.metadata.GetReferencedChunkSet() + if err != nil { + return err + } + + totalChunks := 0 + deletedChunks := 0 + + if err := s.blob.ForEachChunk(func(chunkID string) error { + totalChunks++ + if _, found := referencedChunkSet[chunkID]; found { + return nil + } + if err := s.blob.DeleteBlob(chunkID); err != nil { + return err + } + deletedChunks++ + return nil + }); err != nil { + return err + } + + slog.Info("garbage_collect_completed", + "referenced_chunks", len(referencedChunkSet), + "total_chunks", totalChunks, + "deleted_chunks", deletedChunks, + ) + return nil +} + +func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _ = s.GarbageCollect() + } + } +} diff --git a/storage/blob.go b/storage/blob.go index 23d21f6..957429f 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -87,10 +87,46 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { } fullPath := filepath.Join(dir, chunkID) - if _, err := os.Stat(fullPath); os.IsNotExist(err) { - if err := os.WriteFile(fullPath, data, 0644); err != nil { - return err + if _, err := os.Stat(fullPath); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + + tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*") + if err != nil { + return err + } + tmpPath := tmpFile.Name() + cleanup := true + defer func() { + if cleanup { + _ = os.Remove(tmpPath) } + }() + + if _, err := tmpFile.Write(data); err != nil { + _ = tmpFile.Close() + return err + } + if err := tmpFile.Sync(); err != nil { + _ = tmpFile.Close() + return err + } + if err := tmpFile.Close(); err != nil { + return err + } + + if err := os.Rename(tmpPath, fullPath); err != nil { + if _, statErr := os.Stat(fullPath); statErr == nil { + return nil + } + return err + } + cleanup = false + + if err := syncDir(dir); err != nil { + return err } return nil } @@ -115,6 +151,41 @@ func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) { return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) } +func (bs *BlobStore) DeleteBlob(chunkID string) error { + if !isValidChunkID(chunkID) { + return fmt.Errorf("invalid chunk id: %q", chunkID) + } + return os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) +} + +func (bs *BlobStore) ListChunks() ([]string, error) { + var chunkIDs []string + err := bs.ForEachChunk(func(chunkID string) error { + chunkIDs = append(chunkIDs, chunkID) + return nil + }) + return chunkIDs, err +} + +func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error { + if fn == nil { + return errors.New("chunk callback is required") + } + return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + chunkID := info.Name() + if isValidChunkID(chunkID) { + return fn(chunkID) + } + } + return nil + + }) +} + func isValidChunkID(chunkID string) bool { if len(chunkID) != sha256.Size*2 { return false @@ -126,3 +197,12 @@ func isValidChunkID(chunkID string) bool { } return true } + +func syncDir(dirPath string) error { + dir, err := os.Open(dirPath) + if err != nil { + return err + } + defer dir.Close() + return dir.Sync() +} diff --git a/utils/config.go b/utils/config.go index a73e195..76d9247 100644 --- a/utils/config.go +++ b/utils/config.go @@ -5,31 +5,36 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/joho/godotenv" ) type Config struct { - DataPath string - Address string - Port int - ChunkSize int - LogLevel string - LogFormat string - AuditLog bool + DataPath string + Address string + Port int + ChunkSize int + LogLevel string + LogFormat string + AuditLog bool + GcInterval time.Duration + GcEnabled bool } func NewConfig() *Config { _ = godotenv.Load() config := &Config{ - DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")), - Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"), - Port: envIntRange("PORT", 3000, 1, 65535), - ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024), - LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), - LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), - AuditLog: envBool("AUDIT_LOG", true), + DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")), + Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"), + Port: envIntRange("PORT", 3000, 1, 65535), + ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024), + LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), + LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), + AuditLog: envBool("AUDIT_LOG", true), + GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, -1, 60)) * time.Minute, + GcEnabled: envBool("GC_ENABLED", true), } if config.LogFormat != "json" && config.LogFormat != "text" {