mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-04 20:16:26 +00:00
Added metadata, IO pipe instead of byte transfer.
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@
|
||||
.vscode/
|
||||
blobs/
|
||||
*.db
|
||||
.idea/
|
||||
|
||||
@@ -1 +1,3 @@
|
||||
# fs
|
||||
|
||||
An experimental Object Storage written in Go that should be compatible with S3
|
||||
1
api/api.go
Normal file
1
api/api.go
Normal file
@@ -0,0 +1 @@
|
||||
package api
|
||||
32
main.go
32
main.go
@@ -2,9 +2,10 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"fs/metadata"
|
||||
"fs/service"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"fs/data"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -16,25 +17,38 @@ func main() {
|
||||
}
|
||||
defer imageStream.Close()
|
||||
|
||||
fmt.Fprint(imageStream)
|
||||
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
|
||||
if err != nil {
|
||||
fmt.Printf("Error initializing metadata handler: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
manifest, err := data.IngestStream("test-bucket-ferdzo", "fer.jpg", "image/jpeg", imageStream)
|
||||
objectService := service.NewObjectService(metadataHandler)
|
||||
|
||||
manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream)
|
||||
if err != nil {
|
||||
fmt.Printf("Error ingesting stream: %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Manifest: %+v\n", manifest)
|
||||
|
||||
objectData, err := data.GetObject(manifest)
|
||||
objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg")
|
||||
if err != nil {
|
||||
fmt.Printf("Error retrieving object: %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Retrieved object data length: %d\n", len(objectData))
|
||||
|
||||
err = os.WriteFile("recovered"+manifest.Key, objectData, 0644)
|
||||
fmt.Printf("Retrieved manifest: %+v\n", manifest2)
|
||||
recoveredFile, err := os.Create("recovered_" + manifest2.Key)
|
||||
if err != nil {
|
||||
fmt.Printf("Error writing recovered file: %v\n", err)
|
||||
fmt.Printf("Error creating file: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer recoveredFile.Close()
|
||||
|
||||
bytesWritten, err := io.Copy(recoveredFile, objectData)
|
||||
if err != nil {
|
||||
fmt.Printf("Error streaming to recovered file: %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten)
|
||||
}
|
||||
|
||||
66
metadata/metadata.go
Normal file
66
metadata/metadata.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"fs/models"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const ManifestBucketName = "object_manifests"
|
||||
|
||||
type MetadataHandler struct {
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||
db, err := bbolt.Open(dbPath, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MetadataHandler{db: db}, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key)
|
||||
data, err := json.Marshal(manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return metadataBucket.Put([]byte(key), data)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
||||
var manifest *models.ObjectManifest
|
||||
|
||||
h.db.View(func(tx *bbolt.Tx) error {
|
||||
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("bucket %s not found", ManifestBucketName)
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s", bucket, key)
|
||||
data := metadataBucket.Get([]byte(key))
|
||||
if data == nil {
|
||||
|
||||
return fmt.Errorf("manifest not found for bucket %s and key %s", bucket, key)
|
||||
}
|
||||
err := json.Unmarshal(data, &manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
65
service/service.go
Normal file
65
service/service.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"fs/metadata"
|
||||
"fs/models"
|
||||
"fs/storage"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ObjectService struct {
|
||||
metadataHandler *metadata.MetadataHandler
|
||||
}
|
||||
|
||||
func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService {
|
||||
return &ObjectService{metadataHandler: metadataHandler}
|
||||
}
|
||||
|
||||
func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||
|
||||
bucket := strings.Split(uri, "/")[0]
|
||||
key := strings.Join(strings.Split(uri, "/")[1:], "/")
|
||||
|
||||
chunks, size, etag, err := storage.IngestStream(input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timestamp := time.Now().Unix()
|
||||
|
||||
manifest := &models.ObjectManifest{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
Size: size,
|
||||
ContentType: contentType,
|
||||
ETag: etag,
|
||||
Chunks: chunks,
|
||||
CreatedAt: timestamp,
|
||||
}
|
||||
fmt.Println(manifest)
|
||||
if err = s.metadataHandler.PutManifest(manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
||||
manifest, err := s.metadataHandler.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
|
||||
err := storage.AssembleStream(manifest.Chunks, pw)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
return pr, manifest, nil
|
||||
}
|
||||
@@ -1,11 +1,9 @@
|
||||
package data
|
||||
package storage
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"fs/models"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -14,22 +12,17 @@ import (
|
||||
const chunkSize = 64 * 1024
|
||||
const blobRoot = "blobs/"
|
||||
|
||||
func IngestStream(bucket, key, contentType string, stream io.Reader) (*models.ObjectManifest, error) {
|
||||
manifest := &models.ObjectManifest{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
ContentType: contentType,
|
||||
}
|
||||
|
||||
func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
||||
fullFileHasher := md5.New()
|
||||
|
||||
buffer := make([]byte, chunkSize)
|
||||
var totalSize int64
|
||||
var chunkIDs []string
|
||||
|
||||
for {
|
||||
bytesRead, err := io.ReadFull(stream, buffer)
|
||||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
return nil, err
|
||||
return nil, 0, "", err
|
||||
}
|
||||
|
||||
if bytesRead > 0 {
|
||||
@@ -43,24 +36,21 @@ func IngestStream(bucket, key, contentType string, stream io.Reader) (*models.Ob
|
||||
|
||||
err := saveBlob(chunkID, chunkData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, "", err
|
||||
}
|
||||
manifest.Chunks = append(manifest.Chunks, chunkID)
|
||||
chunkIDs = append(chunkIDs, chunkID)
|
||||
}
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, "", err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
manifest.Size = totalSize
|
||||
manifest.ETag = fmt.Sprintf(`"%s"`, hex.EncodeToString(fullFileHasher.Sum(nil)))
|
||||
|
||||
return manifest, nil
|
||||
|
||||
|
||||
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
||||
return chunkIDs, totalSize, etag, nil
|
||||
}
|
||||
|
||||
func saveBlob(chunkID string, data []byte) error {
|
||||
@@ -75,7 +65,19 @@ func saveBlob(chunkID string, data []byte) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
||||
for _, chunkID := range chunkIDs {
|
||||
chunkData, err := GetBlob(chunkID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write(chunkData); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -83,15 +85,3 @@ func GetBlob(chunkID string) ([]byte, error) {
|
||||
|
||||
return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||
}
|
||||
|
||||
func GetObject(manifest *models.ObjectManifest) ([]byte, error) {
|
||||
var fullData []byte
|
||||
for _, chunkID := range manifest.Chunks {
|
||||
chunkData, err := GetBlob(chunkID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fullData = append(fullData, chunkData...)
|
||||
}
|
||||
return fullData, nil
|
||||
}
|
||||
Reference in New Issue
Block a user