mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 01:36:25 +00:00
Fixed copilot suggestions.
This commit is contained in:
@@ -543,8 +543,8 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
|
|||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
manifest, err := h.svc.GetBucketManifest(bucket)
|
manifest, err := h.svc.GetBucketManifest(bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeMappedS3Error(w, r, err)
|
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
|
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
|
||||||
Name: bucket,
|
Name: bucket,
|
||||||
@@ -833,6 +833,9 @@ func (h *Handler) Start(ctx context.Context, address string) error {
|
|||||||
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
|
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
h.logger.Error("server_listen_failed", "error", err)
|
h.logger.Error("server_listen_failed", "error", err)
|
||||||
|
if closeErr := h.svc.Close(); closeErr != nil {
|
||||||
|
h.logger.Error("service_close_failed", "error", closeErr)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -604,6 +604,7 @@ func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
|
|||||||
|
|
||||||
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
|
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
|
||||||
chunkSet := make(map[string]struct{})
|
chunkSet := make(map[string]struct{})
|
||||||
|
pendingUploadSet := make(map[string]struct{})
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
@@ -632,11 +633,36 @@ func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uploadsBucket := tx.Bucket(multipartUploadIndex)
|
||||||
|
if uploadsBucket == nil {
|
||||||
|
return errors.New("multipart upload index not found")
|
||||||
|
}
|
||||||
|
if err := uploadsBucket.ForEach(func(k, v []byte) error {
|
||||||
|
upload := models.MultipartUpload{}
|
||||||
|
if err := json.Unmarshal(v, &upload); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if upload.State == "pending" {
|
||||||
|
pendingUploadSet[string(k)] = struct{}{}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
partsBucket := tx.Bucket(multipartUploadPartsIndex)
|
partsBucket := tx.Bucket(multipartUploadPartsIndex)
|
||||||
if partsBucket == nil {
|
if partsBucket == nil {
|
||||||
return errors.New("multipart upload parts index not found")
|
return errors.New("multipart upload parts index not found")
|
||||||
}
|
}
|
||||||
if err := partsBucket.ForEach(func(_, v []byte) error {
|
if err := partsBucket.ForEach(func(k, v []byte) error {
|
||||||
|
uploadID, _, ok := strings.Cut(string(k), ":")
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if _, pending := pendingUploadSet[uploadID]; !pending {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
part := models.UploadedPart{}
|
part := models.UploadedPart{}
|
||||||
if err := json.Unmarshal(v, &part); err != nil {
|
if err := json.Unmarshal(v, &part); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -67,13 +67,17 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
|
||||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.gcMu.RUnlock()
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
||||||
_ = pw.CloseWithError(err)
|
_ = pw.CloseWithError(err)
|
||||||
return
|
return
|
||||||
@@ -84,6 +88,9 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return models.ObjectManifest{}, err
|
return models.ObjectManifest{}, err
|
||||||
@@ -98,6 +105,9 @@ func (s *ObjectService) DeleteObject(bucket, key string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
return s.metadata.ListObjects(bucket, prefix)
|
return s.metadata.ListObjects(bucket, prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,11 +118,17 @@ func (s *ObjectService) CreateBucket(bucket string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) HeadBucket(bucket string) error {
|
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
_, err := s.metadata.GetBucketManifest(bucket)
|
_, err := s.metadata.GetBucketManifest(bucket)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
|
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
return s.metadata.GetBucketManifest(bucket)
|
return s.metadata.GetBucketManifest(bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,6 +139,9 @@ func (s *ObjectService) DeleteBucket(bucket string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) ListBuckets() ([]string, error) {
|
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
return s.metadata.ListBuckets()
|
return s.metadata.ListBuckets()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,6 +193,9 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
|
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
|
||||||
|
s.gcMu.RLock()
|
||||||
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -300,6 +322,7 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
|
|
||||||
totalChunks := 0
|
totalChunks := 0
|
||||||
deletedChunks := 0
|
deletedChunks := 0
|
||||||
|
deleteErrors := 0
|
||||||
|
|
||||||
if err := s.blob.ForEachChunk(func(chunkID string) error {
|
if err := s.blob.ForEachChunk(func(chunkID string) error {
|
||||||
totalChunks++
|
totalChunks++
|
||||||
@@ -307,7 +330,9 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := s.blob.DeleteBlob(chunkID); err != nil {
|
if err := s.blob.DeleteBlob(chunkID); err != nil {
|
||||||
return err
|
deleteErrors++
|
||||||
|
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
deletedChunks++
|
deletedChunks++
|
||||||
return nil
|
return nil
|
||||||
@@ -319,6 +344,7 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
"referenced_chunks", len(referencedChunkSet),
|
"referenced_chunks", len(referencedChunkSet),
|
||||||
"total_chunks", totalChunks,
|
"total_chunks", totalChunks,
|
||||||
"deleted_chunks", deletedChunks,
|
"deleted_chunks", deletedChunks,
|
||||||
|
"delete_errors", deleteErrors,
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -155,7 +155,11 @@ func (bs *BlobStore) DeleteBlob(chunkID string) error {
|
|||||||
if !isValidChunkID(chunkID) {
|
if !isValidChunkID(chunkID) {
|
||||||
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||||
}
|
}
|
||||||
return os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||||
|
if err != nil && os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlobStore) ListChunks() ([]string, error) {
|
func (bs *BlobStore) ListChunks() ([]string, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user