From 22a8e4ea2e3136873d6bf1d0436b02ef84153a95 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Sat, 21 Feb 2026 02:04:32 +0100 Subject: [PATCH] Added metadata, IO pipe instead of byte transfer. --- .gitignore | 1 + README.md | 2 ++ api/api.go | 1 + main.go | 32 +++++++++++++------ metadata/metadata.go | 66 +++++++++++++++++++++++++++++++++++++++ service/service.go | 65 ++++++++++++++++++++++++++++++++++++++ {data => storage}/blob.go | 54 +++++++++++++------------------- 7 files changed, 180 insertions(+), 41 deletions(-) create mode 100644 api/api.go create mode 100644 metadata/metadata.go create mode 100644 service/service.go rename {data => storage}/blob.go (64%) diff --git a/.gitignore b/.gitignore index 4228d4e..7a250ef 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .vscode/ blobs/ *.db +.idea/ diff --git a/README.md b/README.md index 9ecdf1c..cca2a29 100644 --- a/README.md +++ b/README.md @@ -1 +1,3 @@ # fs + +An experimental Object Storage written in Go that should be compatible with S3 \ No newline at end of file diff --git a/api/api.go b/api/api.go new file mode 100644 index 0000000..778f64e --- /dev/null +++ b/api/api.go @@ -0,0 +1 @@ +package api diff --git a/main.go b/main.go index 81b6411..caf974d 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,10 @@ package main import ( "fmt" + "fs/metadata" + "fs/service" + "io" "os" - - "fs/data" ) func main() { @@ -16,25 +17,38 @@ func main() { } defer imageStream.Close() - fmt.Fprint(imageStream) + metadataHandler, err := metadata.NewMetadataHandler("metadata.db") + if err != nil { + fmt.Printf("Error initializing metadata handler: %v\n", err) + return + } - manifest, err := data.IngestStream("test-bucket-ferdzo", "fer.jpg", "image/jpeg", imageStream) + objectService := service.NewObjectService(metadataHandler) + + manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream) if err != nil { fmt.Printf("Error ingesting stream: %v\n", err) return } fmt.Printf("Manifest: %+v\n", manifest) - objectData, err := data.GetObject(manifest) + objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg") if err != nil { fmt.Printf("Error retrieving object: %v\n", err) return } - fmt.Printf("Retrieved object data length: %d\n", len(objectData)) - - err = os.WriteFile("recovered"+manifest.Key, objectData, 0644) + fmt.Printf("Retrieved manifest: %+v\n", manifest2) + recoveredFile, err := os.Create("recovered_" + manifest2.Key) if err != nil { - fmt.Printf("Error writing recovered file: %v\n", err) + fmt.Printf("Error creating file: %v\n", err) return } + defer recoveredFile.Close() + + bytesWritten, err := io.Copy(recoveredFile, objectData) + if err != nil { + fmt.Printf("Error streaming to recovered file: %v\n", err) + return + } + fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten) } diff --git a/metadata/metadata.go b/metadata/metadata.go new file mode 100644 index 0000000..7e4ef57 --- /dev/null +++ b/metadata/metadata.go @@ -0,0 +1,66 @@ +package metadata + +import ( + "encoding/json" + "fmt" + "fs/models" + + "go.etcd.io/bbolt" +) + +const ManifestBucketName = "object_manifests" + +type MetadataHandler struct { + db *bbolt.DB +} + +func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { + db, err := bbolt.Open(dbPath, 0600, nil) + if err != nil { + return nil, err + } + return &MetadataHandler{db: db}, nil +} + +func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error { + err := h.db.Update(func(tx *bbolt.Tx) error { + metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName)) + if err != nil { + return err + } + key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key) + data, err := json.Marshal(manifest) + if err != nil { + return err + } + return metadataBucket.Put([]byte(key), data) + }) + if err != nil { + return err + } + return nil +} + +func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) { + var manifest *models.ObjectManifest + + h.db.View(func(tx *bbolt.Tx) error { + metadataBucket := tx.Bucket([]byte(ManifestBucketName)) + if metadataBucket == nil { + return fmt.Errorf("bucket %s not found", ManifestBucketName) + } + key := fmt.Sprintf("%s/%s", bucket, key) + data := metadataBucket.Get([]byte(key)) + if data == nil { + + return fmt.Errorf("manifest not found for bucket %s and key %s", bucket, key) + } + err := json.Unmarshal(data, &manifest) + if err != nil { + return err + } + return nil + }) + + return manifest, nil +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..1750430 --- /dev/null +++ b/service/service.go @@ -0,0 +1,65 @@ +package service + +import ( + "fmt" + "fs/metadata" + "fs/models" + "fs/storage" + "io" + "strings" + "time" +) + +type ObjectService struct { + metadataHandler *metadata.MetadataHandler +} + +func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { + return &ObjectService{metadataHandler: metadataHandler} +} + +func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) { + + bucket := strings.Split(uri, "/")[0] + key := strings.Join(strings.Split(uri, "/")[1:], "/") + + chunks, size, etag, err := storage.IngestStream(input) + if err != nil { + return nil, err + } + timestamp := time.Now().Unix() + + manifest := &models.ObjectManifest{ + Bucket: bucket, + Key: key, + Size: size, + ContentType: contentType, + ETag: etag, + Chunks: chunks, + CreatedAt: timestamp, + } + fmt.Println(manifest) + if err = s.metadataHandler.PutManifest(manifest); err != nil { + return nil, err + } + + return manifest, nil +} + +func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { + manifest, err := s.metadataHandler.GetManifest(bucket, key) + if err != nil { + return nil, nil, err + } + pr, pw := io.Pipe() + + go func() { + defer pw.Close() + + err := storage.AssembleStream(manifest.Chunks, pw) + if err != nil { + return + } + }() + return pr, manifest, nil +} diff --git a/data/blob.go b/storage/blob.go similarity index 64% rename from data/blob.go rename to storage/blob.go index c28d9ac..e56e06f 100644 --- a/data/blob.go +++ b/storage/blob.go @@ -1,11 +1,9 @@ -package data +package storage import ( "crypto/md5" "crypto/sha256" "encoding/hex" - "fmt" - "fs/models" "io" "os" "path/filepath" @@ -14,22 +12,17 @@ import ( const chunkSize = 64 * 1024 const blobRoot = "blobs/" -func IngestStream(bucket, key, contentType string, stream io.Reader) (*models.ObjectManifest, error) { - manifest := &models.ObjectManifest{ - Bucket: bucket, - Key: key, - ContentType: contentType, - } - +func IngestStream(stream io.Reader) ([]string, int64, string, error) { fullFileHasher := md5.New() buffer := make([]byte, chunkSize) var totalSize int64 + var chunkIDs []string for { bytesRead, err := io.ReadFull(stream, buffer) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - return nil, err + return nil, 0, "", err } if bytesRead > 0 { @@ -43,24 +36,21 @@ func IngestStream(bucket, key, contentType string, stream io.Reader) (*models.Ob err := saveBlob(chunkID, chunkData) if err != nil { - return nil, err + return nil, 0, "", err } - manifest.Chunks = append(manifest.Chunks, chunkID) + chunkIDs = append(chunkIDs, chunkID) } if err == io.EOF || err == io.ErrUnexpectedEOF { break } if err != nil { - return nil, err + return nil, 0, "", err } } - - manifest.Size = totalSize - manifest.ETag = fmt.Sprintf(`"%s"`, hex.EncodeToString(fullFileHasher.Sum(nil))) - - return manifest, nil - + + etag := hex.EncodeToString(fullFileHasher.Sum(nil)) + return chunkIDs, totalSize, etag, nil } func saveBlob(chunkID string, data []byte) error { @@ -75,7 +65,19 @@ func saveBlob(chunkID string, data []byte) error { return err } } + return nil +} +func AssembleStream(chunkIDs []string, w *io.PipeWriter) error { + for _, chunkID := range chunkIDs { + chunkData, err := GetBlob(chunkID) + if err != nil { + return err + } + if _, err := w.Write(chunkData); err != nil { + return err + } + } return nil } @@ -83,15 +85,3 @@ func GetBlob(chunkID string) ([]byte, error) { return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID)) } - -func GetObject(manifest *models.ObjectManifest) ([]byte, error) { - var fullData []byte - for _, chunkID := range manifest.Chunks { - chunkData, err := GetBlob(chunkID) - if err != nil { - return nil, err - } - fullData = append(fullData, chunkData...) - } - return fullData, nil -}