mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-04 20:36:25 +00:00
252 lines
5.3 KiB
Go
252 lines
5.3 KiB
Go
package storage
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"fs/metrics"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const blobRoot = "blobs"
|
|
const maxChunkSize = 64 * 1024 * 1024
|
|
|
|
type BlobStore struct {
|
|
dataRoot string
|
|
chunkSize int
|
|
}
|
|
|
|
func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
|
|
root = strings.TrimSpace(root)
|
|
if root == "" {
|
|
return nil, errors.New("blob root is required")
|
|
}
|
|
if chunkSize <= 0 || chunkSize > maxChunkSize {
|
|
return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize)
|
|
}
|
|
|
|
cleanRoot := filepath.Clean(root)
|
|
if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil
|
|
}
|
|
|
|
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
|
start := time.Now()
|
|
fullFileHasher := md5.New()
|
|
|
|
buffer := make([]byte, bs.chunkSize)
|
|
var totalSize int64
|
|
var chunkIDs []string
|
|
success := false
|
|
defer func() {
|
|
metrics.Default.ObserveBlob("ingest_stream", time.Since(start), 0, success)
|
|
}()
|
|
|
|
for {
|
|
bytesRead, err := io.ReadFull(stream, buffer)
|
|
if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
|
|
return nil, 0, "", err
|
|
}
|
|
|
|
if bytesRead > 0 {
|
|
chunkData := buffer[:bytesRead]
|
|
totalSize += int64(bytesRead)
|
|
|
|
fullFileHasher.Write(chunkData)
|
|
|
|
chunkHash := sha256.Sum256(chunkData)
|
|
chunkID := hex.EncodeToString(chunkHash[:])
|
|
|
|
err := bs.saveBlob(chunkID, chunkData)
|
|
if err != nil {
|
|
return nil, 0, "", err
|
|
}
|
|
chunkIDs = append(chunkIDs, chunkID)
|
|
}
|
|
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, 0, "", err
|
|
}
|
|
|
|
}
|
|
|
|
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
|
success = true
|
|
return chunkIDs, totalSize, etag, nil
|
|
}
|
|
|
|
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
|
start := time.Now()
|
|
success := false
|
|
writtenBytes := int64(0)
|
|
defer func() {
|
|
metrics.Default.ObserveBlob("write_chunk", time.Since(start), writtenBytes, success)
|
|
}()
|
|
|
|
if !isValidChunkID(chunkID) {
|
|
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
|
}
|
|
dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4])
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return err
|
|
}
|
|
|
|
fullPath := filepath.Join(dir, chunkID)
|
|
if _, err := os.Stat(fullPath); err == nil {
|
|
success = true
|
|
return nil
|
|
} else if !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
|
|
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tmpPath := tmpFile.Name()
|
|
cleanup := true
|
|
defer func() {
|
|
if cleanup {
|
|
_ = os.Remove(tmpPath)
|
|
}
|
|
}()
|
|
|
|
if _, err := tmpFile.Write(data); err != nil {
|
|
_ = tmpFile.Close()
|
|
return err
|
|
}
|
|
if err := tmpFile.Sync(); err != nil {
|
|
_ = tmpFile.Close()
|
|
return err
|
|
}
|
|
if err := tmpFile.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := os.Rename(tmpPath, fullPath); err != nil {
|
|
if _, statErr := os.Stat(fullPath); statErr == nil {
|
|
success = true
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
cleanup = false
|
|
|
|
if err := syncDir(dir); err != nil {
|
|
return err
|
|
}
|
|
writtenBytes = int64(len(data))
|
|
success = true
|
|
return nil
|
|
}
|
|
|
|
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
|
start := time.Now()
|
|
success := false
|
|
defer func() {
|
|
metrics.Default.ObserveBlob("assemble_stream", time.Since(start), 0, success)
|
|
}()
|
|
|
|
for _, chunkID := range chunkIDs {
|
|
chunkData, err := bs.GetBlob(chunkID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := w.Write(chunkData); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
success = true
|
|
return nil
|
|
}
|
|
|
|
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
|
|
start := time.Now()
|
|
success := false
|
|
var size int64
|
|
defer func() {
|
|
metrics.Default.ObserveBlob("read_chunk", time.Since(start), size, success)
|
|
}()
|
|
|
|
if !isValidChunkID(chunkID) {
|
|
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
|
|
}
|
|
data, err := os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
size = int64(len(data))
|
|
success = true
|
|
return data, nil
|
|
}
|
|
|
|
func (bs *BlobStore) DeleteBlob(chunkID string) error {
|
|
if !isValidChunkID(chunkID) {
|
|
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
|
}
|
|
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
|
if err != nil && os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (bs *BlobStore) ListChunks() ([]string, error) {
|
|
var chunkIDs []string
|
|
err := bs.ForEachChunk(func(chunkID string) error {
|
|
chunkIDs = append(chunkIDs, chunkID)
|
|
return nil
|
|
})
|
|
return chunkIDs, err
|
|
}
|
|
|
|
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
|
|
if fn == nil {
|
|
return errors.New("chunk callback is required")
|
|
}
|
|
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !info.IsDir() {
|
|
chunkID := info.Name()
|
|
if isValidChunkID(chunkID) {
|
|
return fn(chunkID)
|
|
}
|
|
}
|
|
return nil
|
|
|
|
})
|
|
}
|
|
|
|
func isValidChunkID(chunkID string) bool {
|
|
if len(chunkID) != sha256.Size*2 {
|
|
return false
|
|
}
|
|
for _, ch := range chunkID {
|
|
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func syncDir(dirPath string) error {
|
|
dir, err := os.Open(dirPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dir.Close()
|
|
return dir.Sync()
|
|
}
|