From 06c90be50f892d913bffa059e053885d28aba275 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Tue, 24 Feb 2026 00:28:33 +0100 Subject: [PATCH] Fixed copilot suggestions. --- api/api.go | 7 +++++-- metadata/metadata.go | 28 +++++++++++++++++++++++++++- service/service.go | 28 +++++++++++++++++++++++++++- storage/blob.go | 6 +++++- 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/api/api.go b/api/api.go index ef7b4e9..4dabc74 100644 --- a/api/api.go +++ b/api/api.go @@ -543,8 +543,8 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) { for _, bucket := range buckets { manifest, err := h.svc.GetBucketManifest(bucket) if err != nil { - writeMappedS3Error(w, r, err) - return + h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err) + continue } response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{ Name: bucket, @@ -833,6 +833,9 @@ func (h *Handler) Start(ctx context.Context, address string) error { h.logger.Info("shutdown_context_done", "reason", ctx.Err()) case err := <-errCh: h.logger.Error("server_listen_failed", "error", err) + if closeErr := h.svc.Close(); closeErr != nil { + h.logger.Error("service_close_failed", "error", closeErr) + } return err } diff --git a/metadata/metadata.go b/metadata/metadata.go index 08c276a..9e50a3a 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -604,6 +604,7 @@ func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error { func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) { chunkSet := make(map[string]struct{}) + pendingUploadSet := make(map[string]struct{}) err := h.db.View(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) @@ -632,11 +633,36 @@ func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) { } } + uploadsBucket := tx.Bucket(multipartUploadIndex) + if uploadsBucket == nil { + return errors.New("multipart upload index not found") + } + if err := uploadsBucket.ForEach(func(k, v []byte) error { + upload := models.MultipartUpload{} + if err := json.Unmarshal(v, &upload); err != nil { + return err + } + if upload.State == "pending" { + pendingUploadSet[string(k)] = struct{}{} + } + return nil + }); err != nil { + return err + } + partsBucket := tx.Bucket(multipartUploadPartsIndex) if partsBucket == nil { return errors.New("multipart upload parts index not found") } - if err := partsBucket.ForEach(func(_, v []byte) error { + if err := partsBucket.ForEach(func(k, v []byte) error { + uploadID, _, ok := strings.Cut(string(k), ":") + if !ok { + return nil + } + if _, pending := pendingUploadSet[uploadID]; !pending { + return nil + } + part := models.UploadedPart{} if err := json.Unmarshal(v, &part); err != nil { return err diff --git a/service/service.go b/service/service.go index 2b9d193..c2553ad 100644 --- a/service/service.go +++ b/service/service.go @@ -67,13 +67,17 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read } func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { + s.gcMu.RLock() + manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { + s.gcMu.RUnlock() return nil, nil, err } pr, pw := io.Pipe() go func() { + defer s.gcMu.RUnlock() if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil { _ = pw.CloseWithError(err) return @@ -84,6 +88,9 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob } func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { return models.ObjectManifest{}, err @@ -98,6 +105,9 @@ func (s *ObjectService) DeleteObject(bucket, key string) error { } func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + return s.metadata.ListObjects(bucket, prefix) } @@ -108,11 +118,17 @@ func (s *ObjectService) CreateBucket(bucket string) error { } func (s *ObjectService) HeadBucket(bucket string) error { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + _, err := s.metadata.GetBucketManifest(bucket) return err } func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + return s.metadata.GetBucketManifest(bucket) } @@ -123,6 +139,9 @@ func (s *ObjectService) DeleteBucket(bucket string) error { } func (s *ObjectService) ListBuckets() ([]string, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + return s.metadata.ListBuckets() } @@ -174,6 +193,9 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, } func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { + s.gcMu.RLock() + defer s.gcMu.RUnlock() + upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { return nil, err @@ -300,6 +322,7 @@ func (s *ObjectService) GarbageCollect() error { totalChunks := 0 deletedChunks := 0 + deleteErrors := 0 if err := s.blob.ForEachChunk(func(chunkID string) error { totalChunks++ @@ -307,7 +330,9 @@ func (s *ObjectService) GarbageCollect() error { return nil } if err := s.blob.DeleteBlob(chunkID); err != nil { - return err + deleteErrors++ + slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err) + return nil } deletedChunks++ return nil @@ -319,6 +344,7 @@ func (s *ObjectService) GarbageCollect() error { "referenced_chunks", len(referencedChunkSet), "total_chunks", totalChunks, "deleted_chunks", deletedChunks, + "delete_errors", deleteErrors, ) return nil } diff --git a/storage/blob.go b/storage/blob.go index 957429f..6215a6f 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -155,7 +155,11 @@ func (bs *BlobStore) DeleteBlob(chunkID string) error { if !isValidChunkID(chunkID) { return fmt.Errorf("invalid chunk id: %q", chunkID) } - return os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) + err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) + if err != nil && os.IsNotExist(err) { + return nil + } + return err } func (bs *BlobStore) ListChunks() ([]string, error) {