mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 08:26:28 +00:00
Compare commits
9 Commits
a4990dae01
...
edfb5f5b2a
| Author | SHA1 | Date | |
|---|---|---|---|
| edfb5f5b2a | |||
| c997fe8471 | |||
| fca553028c | |||
| 3630aad584 | |||
| 93296ff74e | |||
| 1b7393a545 | |||
|
|
a3fad34272 | ||
| 06c90be50f | |||
| 5e87247087 |
@@ -4,3 +4,5 @@ DATA_PATH=data/
|
||||
PORT=2600
|
||||
AUDIT_LOG=true
|
||||
ADDRESS=0.0.0.0
|
||||
GC_INTERVAL=10
|
||||
GC_ENABLED=true
|
||||
8
LICENSE.md
Normal file
8
LICENSE.md
Normal file
@@ -0,0 +1,8 @@
|
||||
Copyright 2025 ferdzo
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
19
README.md
19
README.md
@@ -4,31 +4,38 @@ An experimental Object Storage written in Go that should be partially compatible
|
||||
|
||||
## Features
|
||||
|
||||
- Bucket operations:
|
||||
Bucket operations:
|
||||
- `PUT /{bucket}`
|
||||
- `HEAD /{bucket}`
|
||||
- `DELETE /{bucket}`
|
||||
- `GET /` (list buckets)
|
||||
- Object operations:
|
||||
-
|
||||
Object operations:
|
||||
- `PUT /{bucket}/{key}`
|
||||
- `GET /{bucket}/{key}`
|
||||
- `HEAD /{bucket}/{key}`
|
||||
- `DELETE /{bucket}/{key}`
|
||||
- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style)
|
||||
- Multipart upload:
|
||||
|
||||
Multipart upload:
|
||||
- `POST /{bucket}/{key}?uploads` (initiate)
|
||||
- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part)
|
||||
- `GET /{bucket}/{key}?uploadId=...` (list parts)
|
||||
- `POST /{bucket}/{key}?uploadId=...` (complete)
|
||||
- `DELETE /{bucket}/{key}?uploadId=...` (abort)
|
||||
- Multi-object delete:
|
||||
|
||||
Multi-object delete:
|
||||
- `POST /{bucket}?delete` with S3-style XML body
|
||||
- AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies)
|
||||
|
||||
AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies)
|
||||
|
||||
## Limitations
|
||||
|
||||
- No authentication/authorization yet.
|
||||
- Not full S3 API coverage.
|
||||
- No garbage collection of unreferenced blob chunks yet.
|
||||
- No versioning or lifecycle policies.
|
||||
- Error and edge-case behavior is still being refined for client compatibility.
|
||||
|
||||
## License
|
||||
|
||||
MIT License
|
||||
|
||||
313
api/api.go
313
api/api.go
@@ -3,6 +3,7 @@ package api
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -10,15 +11,13 @@ import (
|
||||
"fs/metadata"
|
||||
"fs/models"
|
||||
"fs/service"
|
||||
"fs/utils"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -100,6 +99,34 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
|
||||
if rangeHeader != "" {
|
||||
start, end, err := parseSingleByteRange(rangeHeader, manifest.Size)
|
||||
if err != nil {
|
||||
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size))
|
||||
writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
if start > 0 {
|
||||
if _, err := io.CopyN(io.Discard, stream, start); err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
length := end - start + 1
|
||||
w.Header().Set("Content-Type", manifest.ContentType)
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
|
||||
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size))
|
||||
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
|
||||
w.Header().Set("Accept-Ranges", "bytes")
|
||||
w.WriteHeader(http.StatusPartialContent)
|
||||
_, _ = io.CopyN(w, stream, length)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", manifest.ContentType)
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
|
||||
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||
@@ -418,8 +445,16 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
keys := make([]string, 0, len(req.Objects))
|
||||
response := models.DeleteObjectsResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
}
|
||||
for _, obj := range req.Objects {
|
||||
if obj.Key == "" {
|
||||
response.Errors = append(response.Errors, models.DeleteError{
|
||||
Key: obj.Key,
|
||||
Code: s3ErrInvalidObjectKey.Code,
|
||||
Message: s3ErrInvalidObjectKey.Message,
|
||||
})
|
||||
continue
|
||||
}
|
||||
keys = append(keys, obj.Key)
|
||||
@@ -431,9 +466,6 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
response := models.DeleteObjectsResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
}
|
||||
if !req.Quiet {
|
||||
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
|
||||
for _, key := range deleted {
|
||||
@@ -496,25 +528,47 @@ func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/xml")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
for _, bucket := range buckets {
|
||||
_, err := w.Write([]byte(bucket))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
response := models.ListAllMyBucketsResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Owner: models.BucketsOwner{
|
||||
ID: "local",
|
||||
DisplayName: "local",
|
||||
},
|
||||
Buckets: models.BucketsElement{
|
||||
Items: make([]models.BucketItem, 0, len(buckets)),
|
||||
},
|
||||
}
|
||||
|
||||
for _, bucket := range buckets {
|
||||
manifest, err := h.svc.GetBucketManifest(bucket)
|
||||
if err != nil {
|
||||
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
|
||||
continue
|
||||
}
|
||||
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
|
||||
Name: bucket,
|
||||
CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
})
|
||||
}
|
||||
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
|
||||
if r.URL.Query().Get("list-type") == "2" {
|
||||
prefix := r.URL.Query().Get("prefix")
|
||||
if prefix == "" {
|
||||
prefix = ""
|
||||
}
|
||||
h.handleListObjectsV2(w, r, bucket, prefix)
|
||||
h.handleListObjectsV2(w, r, bucket)
|
||||
return
|
||||
}
|
||||
if r.URL.Query().Has("location") {
|
||||
@@ -534,30 +588,224 @@ func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket, prefix string) {
|
||||
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
|
||||
prefix := r.URL.Query().Get("prefix")
|
||||
delimiter := r.URL.Query().Get("delimiter")
|
||||
startAfter := r.URL.Query().Get("start-after")
|
||||
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
|
||||
if encodingType != "" && encodingType != "url" {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
maxKeys := 1000
|
||||
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
|
||||
parsed, err := strconv.Atoi(rawMaxKeys)
|
||||
if err != nil || parsed < 0 {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if parsed > 1000 {
|
||||
parsed = 1000
|
||||
}
|
||||
maxKeys = parsed
|
||||
}
|
||||
|
||||
continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token"))
|
||||
continuationMarker := ""
|
||||
if continuationToken != "" {
|
||||
decoded, err := base64.StdEncoding.DecodeString(continuationToken)
|
||||
if err != nil || len(decoded) == 0 {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
continuationMarker = string(decoded)
|
||||
}
|
||||
|
||||
objects, err := h.svc.ListObjects(bucket, prefix)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects)
|
||||
entries := buildListV2Entries(objects, prefix, delimiter)
|
||||
startIdx := 0
|
||||
if continuationMarker != "" {
|
||||
found := false
|
||||
for i, entry := range entries {
|
||||
if entry.Marker == continuationMarker {
|
||||
startIdx = i + 1
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
} else if startAfter != "" {
|
||||
for startIdx < len(entries) && entries[startIdx].SortKey <= startAfter {
|
||||
startIdx++
|
||||
}
|
||||
}
|
||||
|
||||
result := models.ListBucketResultV2{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Name: bucket,
|
||||
Prefix: s3EncodeIfNeeded(prefix, encodingType),
|
||||
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
|
||||
MaxKeys: maxKeys,
|
||||
ContinuationToken: continuationToken,
|
||||
StartAfter: s3EncodeIfNeeded(startAfter, encodingType),
|
||||
EncodingType: encodingType,
|
||||
}
|
||||
|
||||
endIdx := startIdx
|
||||
for endIdx < len(entries) && result.KeyCount < maxKeys {
|
||||
entry := entries[endIdx]
|
||||
if entry.Object != nil {
|
||||
result.Contents = append(result.Contents, models.Contents{
|
||||
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
|
||||
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
ETag: `"` + entry.Object.ETag + `"`,
|
||||
Size: entry.Object.Size,
|
||||
StorageClass: "STANDARD",
|
||||
})
|
||||
} else {
|
||||
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
|
||||
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
|
||||
})
|
||||
}
|
||||
result.KeyCount++
|
||||
endIdx++
|
||||
}
|
||||
|
||||
result.IsTruncated = endIdx < len(entries)
|
||||
if result.IsTruncated && result.KeyCount > 0 {
|
||||
result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[endIdx-1].Marker))
|
||||
}
|
||||
|
||||
xmlResponse, err := xml.MarshalIndent(result, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, err = w.Write([]byte(xmlResponse))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(xmlResponse)
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) Start(address string) error {
|
||||
type listV2Entry struct {
|
||||
Marker string
|
||||
SortKey string
|
||||
Object *models.ObjectManifest
|
||||
CommonPrefix string
|
||||
}
|
||||
|
||||
func buildListV2Entries(objects []*models.ObjectManifest, prefix, delimiter string) []listV2Entry {
|
||||
sorted := make([]*models.ObjectManifest, 0, len(objects))
|
||||
sorted = append(sorted, objects...)
|
||||
sort.Slice(sorted, func(i, j int) bool {
|
||||
return sorted[i].Key < sorted[j].Key
|
||||
})
|
||||
|
||||
entries := make([]listV2Entry, 0, len(sorted))
|
||||
seenCommonPrefixes := make(map[string]struct{})
|
||||
for _, object := range sorted {
|
||||
if object == nil {
|
||||
continue
|
||||
}
|
||||
if delimiter != "" {
|
||||
relative := strings.TrimPrefix(object.Key, prefix)
|
||||
if idx := strings.Index(relative, delimiter); idx >= 0 {
|
||||
commonPrefix := prefix + relative[:idx+len(delimiter)]
|
||||
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
|
||||
continue
|
||||
}
|
||||
seenCommonPrefixes[commonPrefix] = struct{}{}
|
||||
entries = append(entries, listV2Entry{
|
||||
Marker: "C:" + commonPrefix,
|
||||
SortKey: commonPrefix,
|
||||
CommonPrefix: commonPrefix,
|
||||
})
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
entries = append(entries, listV2Entry{
|
||||
Marker: "K:" + object.Key,
|
||||
SortKey: object.Key,
|
||||
Object: object,
|
||||
})
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func s3EncodeIfNeeded(value, encodingType string) string {
|
||||
if encodingType != "url" || value == "" {
|
||||
return value
|
||||
}
|
||||
encoded := url.QueryEscape(value)
|
||||
return strings.ReplaceAll(encoded, "+", "%20")
|
||||
}
|
||||
|
||||
func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) {
|
||||
if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes="))
|
||||
if spec == "" || strings.Contains(spec, ",") {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
|
||||
parts := strings.SplitN(spec, "-", 2)
|
||||
if len(parts) != 2 {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
|
||||
if parts[0] == "" {
|
||||
suffixLength, err := strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil || suffixLength <= 0 {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
if suffixLength > size {
|
||||
suffixLength = size
|
||||
}
|
||||
start := size - suffixLength
|
||||
end := size - 1
|
||||
return start, end, nil
|
||||
}
|
||||
|
||||
start, err := strconv.ParseInt(parts[0], 10, 64)
|
||||
if err != nil || start < 0 || start >= size {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
|
||||
var end int64
|
||||
if parts[1] == "" {
|
||||
end = size - 1
|
||||
} else {
|
||||
end, err = strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil || end < start {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
if end >= size {
|
||||
end = size - 1
|
||||
}
|
||||
}
|
||||
|
||||
return start, end, nil
|
||||
}
|
||||
|
||||
func (h *Handler) Start(ctx context.Context, address string) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
h.logger.Info("server_starting",
|
||||
"address", address,
|
||||
"log_format", h.logConfig.Format,
|
||||
@@ -565,9 +813,7 @@ func (h *Handler) Start(address string) error {
|
||||
"audit_log", h.logConfig.Audit,
|
||||
)
|
||||
h.setupRoutes()
|
||||
stop := make(chan os.Signal, 1)
|
||||
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
||||
defer signal.Stop(stop)
|
||||
|
||||
server := http.Server{
|
||||
Addr: address,
|
||||
Handler: h.router,
|
||||
@@ -583,10 +829,13 @@ func (h *Handler) Start(address string) error {
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
h.logger.Info("shutdown_signal_received")
|
||||
case <-ctx.Done():
|
||||
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
|
||||
case err := <-errCh:
|
||||
h.logger.Error("server_listen_failed", "error", err)
|
||||
if closeErr := h.svc.Close(); closeErr != nil {
|
||||
h.logger.Error("service_close_failed", "error", closeErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,16 @@ var (
|
||||
Code: "MalformedXML",
|
||||
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
|
||||
}
|
||||
s3ErrInvalidArgument = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidArgument",
|
||||
Message: "Invalid argument.",
|
||||
}
|
||||
s3ErrInvalidRange = s3APIError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "InvalidRange",
|
||||
Message: "The requested range is not satisfiable.",
|
||||
}
|
||||
s3ErrEntityTooSmall = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "EntityTooSmall",
|
||||
|
||||
13
main.go
13
main.go
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fs/api"
|
||||
"fs/logging"
|
||||
"fs/metadata"
|
||||
@@ -8,8 +9,10 @@ import (
|
||||
"fs/storage"
|
||||
"fs/utils"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -44,8 +47,16 @@ func main() {
|
||||
objectService := service.NewObjectService(metadataHandler, blobHandler)
|
||||
handler := api.NewHandler(objectService, logger, logConfig)
|
||||
addr := config.Address + ":" + strconv.Itoa(config.Port)
|
||||
if err = handler.Start(addr); err != nil {
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
if config.GcEnabled {
|
||||
go objectService.RunGC(ctx, config.GcInterval)
|
||||
}
|
||||
|
||||
if err = handler.Start(ctx, addr); err != nil {
|
||||
logger.Error("server_stopped_with_error", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -126,6 +126,22 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
|
||||
if k, _ := metadataBucket.Cursor().First(); k != nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
|
||||
}
|
||||
|
||||
multipartUploadsBucket, err := getMultipartUploadBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cursor := multipartUploadsBucket.Cursor()
|
||||
for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() {
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(payload, &upload); err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.Bucket == bucketName && upload.State == "pending" {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
|
||||
}
|
||||
@@ -585,3 +601,98 @@ func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
|
||||
chunkSet := make(map[string]struct{})
|
||||
pendingUploadSet := make(map[string]struct{})
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
c := systemIndexBucket.Cursor()
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
metadataBucket := tx.Bucket(k)
|
||||
if metadataBucket == nil {
|
||||
continue
|
||||
}
|
||||
err := metadataBucket.ForEach(func(k, v []byte) error {
|
||||
object := models.ObjectManifest{}
|
||||
err := json.Unmarshal(v, &object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, chunkID := range object.Chunks {
|
||||
chunkSet[chunkID] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
uploadsBucket := tx.Bucket(multipartUploadIndex)
|
||||
if uploadsBucket == nil {
|
||||
return errors.New("multipart upload index not found")
|
||||
}
|
||||
if err := uploadsBucket.ForEach(func(k, v []byte) error {
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(v, &upload); err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State == "pending" {
|
||||
pendingUploadSet[string(k)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partsBucket := tx.Bucket(multipartUploadPartsIndex)
|
||||
if partsBucket == nil {
|
||||
return errors.New("multipart upload parts index not found")
|
||||
}
|
||||
if err := partsBucket.ForEach(func(k, v []byte) error {
|
||||
uploadID, _, ok := strings.Cut(string(k), ":")
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if _, pending := pendingUploadSet[uploadID]; !pending {
|
||||
return nil
|
||||
}
|
||||
|
||||
part := models.UploadedPart{}
|
||||
if err := json.Unmarshal(v, &part); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, chunkID := range part.Chunks {
|
||||
chunkSet[chunkID] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return chunkSet, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetReferencedChunks() ([]string, error) {
|
||||
chunkSet, err := h.GetReferencedChunkSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chunks := make([]string, 0, len(chunkSet))
|
||||
for chunkID := range chunkSet {
|
||||
chunks = append(chunks, chunkID)
|
||||
}
|
||||
return chunks, nil
|
||||
|
||||
}
|
||||
|
||||
@@ -24,6 +24,27 @@ type BucketManifest struct {
|
||||
PublicAccessBlock bool `json:"public_access_block"`
|
||||
}
|
||||
|
||||
type ListAllMyBucketsResult struct {
|
||||
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Owner BucketsOwner `xml:"Owner"`
|
||||
Buckets BucketsElement `xml:"Buckets"`
|
||||
}
|
||||
|
||||
type BucketsOwner struct {
|
||||
ID string `xml:"ID"`
|
||||
DisplayName string `xml:"DisplayName,omitempty"`
|
||||
}
|
||||
|
||||
type BucketsElement struct {
|
||||
Items []BucketItem `xml:"Bucket"`
|
||||
}
|
||||
|
||||
type BucketItem struct {
|
||||
Name string `xml:"Name"`
|
||||
CreationDate string `xml:"CreationDate"`
|
||||
}
|
||||
|
||||
type S3ErrorResponse struct {
|
||||
XMLName xml.Name `xml:"Error"`
|
||||
Code string `xml:"Code"`
|
||||
@@ -47,6 +68,25 @@ type ListBucketResult struct {
|
||||
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
|
||||
}
|
||||
|
||||
type ListBucketResultV2 struct {
|
||||
XMLName xml.Name `xml:"ListBucketResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
|
||||
Name string `xml:"Name"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
Delimiter string `xml:"Delimiter,omitempty"`
|
||||
MaxKeys int `xml:"MaxKeys"`
|
||||
KeyCount int `xml:"KeyCount"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
ContinuationToken string `xml:"ContinuationToken,omitempty"`
|
||||
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
|
||||
StartAfter string `xml:"StartAfter,omitempty"`
|
||||
EncodingType string `xml:"EncodingType,omitempty"`
|
||||
|
||||
Contents []Contents `xml:"Contents,omitempty"`
|
||||
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
|
||||
}
|
||||
|
||||
type Contents struct {
|
||||
Key string `xml:"Key"`
|
||||
LastModified string `xml:"LastModified"`
|
||||
@@ -131,8 +171,15 @@ type DeleteObjectsResult struct {
|
||||
XMLName xml.Name `xml:"DeleteResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
|
||||
Errors []DeleteError `xml:"Error,omitempty"`
|
||||
}
|
||||
|
||||
type DeletedEntry struct {
|
||||
Key string `xml:"Key"`
|
||||
}
|
||||
|
||||
type DeleteError struct {
|
||||
Key string `xml:"Key"`
|
||||
Code string `xml:"Code"`
|
||||
Message string `xml:"Message"`
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
@@ -11,12 +12,14 @@ import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ObjectService struct {
|
||||
metadata *metadata.MetadataHandler
|
||||
blob *storage.BlobStore
|
||||
gcMu sync.RWMutex
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -31,6 +34,8 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st
|
||||
}
|
||||
|
||||
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
chunks, size, etag, err := s.blob.IngestStream(input)
|
||||
if err != nil {
|
||||
@@ -62,29 +67,30 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
|
||||
}
|
||||
|
||||
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
|
||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
s.gcMu.RUnlock()
|
||||
return nil, nil, err
|
||||
}
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
defer func(pw *io.PipeWriter) {
|
||||
err := pw.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(pw)
|
||||
|
||||
err := s.blob.AssembleStream(manifest.Chunks, pw)
|
||||
if err != nil {
|
||||
defer s.gcMu.RUnlock()
|
||||
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
_ = pw.Close()
|
||||
}()
|
||||
return pr, manifest, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
return models.ObjectManifest{}, err
|
||||
@@ -93,39 +99,68 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteObject(bucket, key string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.DeleteManifest(bucket, key)
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.ListObjects(bucket, prefix)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.CreateBucket(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
_, err := s.metadata.GetBucketManifest(bucket)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.GetBucketManifest(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.DeleteBucket(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.ListBuckets()
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.DeleteManifests(bucket, keys)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.CreateMultipartUpload(bucket, key)
|
||||
}
|
||||
|
||||
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
if partNumber < 1 || partNumber > 10000 {
|
||||
return "", ErrInvalidPart
|
||||
}
|
||||
@@ -158,6 +193,9 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -169,6 +207,9 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode
|
||||
}
|
||||
|
||||
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
if len(completed) == 0 {
|
||||
return nil, ErrInvalidCompleteRequest
|
||||
}
|
||||
@@ -236,6 +277,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
|
||||
}
|
||||
|
||||
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -266,3 +310,55 @@ func buildMultipartETag(parts []models.UploadedPart) string {
|
||||
func (s *ObjectService) Close() error {
|
||||
return s.metadata.Close()
|
||||
}
|
||||
|
||||
func (s *ObjectService) GarbageCollect() error {
|
||||
s.gcMu.Lock()
|
||||
defer s.gcMu.Unlock()
|
||||
|
||||
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalChunks := 0
|
||||
deletedChunks := 0
|
||||
deleteErrors := 0
|
||||
|
||||
if err := s.blob.ForEachChunk(func(chunkID string) error {
|
||||
totalChunks++
|
||||
if _, found := referencedChunkSet[chunkID]; found {
|
||||
return nil
|
||||
}
|
||||
if err := s.blob.DeleteBlob(chunkID); err != nil {
|
||||
deleteErrors++
|
||||
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
|
||||
return nil
|
||||
}
|
||||
deletedChunks++
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Info("garbage_collect_completed",
|
||||
"referenced_chunks", len(referencedChunkSet),
|
||||
"total_chunks", totalChunks,
|
||||
"deleted_chunks", deletedChunks,
|
||||
"delete_errors", deleteErrors,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
_ = s.GarbageCollect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,10 +87,46 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
||||
}
|
||||
|
||||
fullPath := filepath.Join(dir, chunkID)
|
||||
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
|
||||
if err := os.WriteFile(fullPath, data, 0644); err != nil {
|
||||
return err
|
||||
if _, err := os.Stat(fullPath); err == nil {
|
||||
return nil
|
||||
} else if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmpPath := tmpFile.Name()
|
||||
cleanup := true
|
||||
defer func() {
|
||||
if cleanup {
|
||||
_ = os.Remove(tmpPath)
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := tmpFile.Write(data); err != nil {
|
||||
_ = tmpFile.Close()
|
||||
return err
|
||||
}
|
||||
if err := tmpFile.Sync(); err != nil {
|
||||
_ = tmpFile.Close()
|
||||
return err
|
||||
}
|
||||
if err := tmpFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, fullPath); err != nil {
|
||||
if _, statErr := os.Stat(fullPath); statErr == nil {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
cleanup = false
|
||||
|
||||
if err := syncDir(dir); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -115,6 +151,45 @@ func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
|
||||
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||
}
|
||||
|
||||
func (bs *BlobStore) DeleteBlob(chunkID string) error {
|
||||
if !isValidChunkID(chunkID) {
|
||||
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||
}
|
||||
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (bs *BlobStore) ListChunks() ([]string, error) {
|
||||
var chunkIDs []string
|
||||
err := bs.ForEachChunk(func(chunkID string) error {
|
||||
chunkIDs = append(chunkIDs, chunkID)
|
||||
return nil
|
||||
})
|
||||
return chunkIDs, err
|
||||
}
|
||||
|
||||
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
|
||||
if fn == nil {
|
||||
return errors.New("chunk callback is required")
|
||||
}
|
||||
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
chunkID := info.Name()
|
||||
if isValidChunkID(chunkID) {
|
||||
return fn(chunkID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func isValidChunkID(chunkID string) bool {
|
||||
if len(chunkID) != sha256.Size*2 {
|
||||
return false
|
||||
@@ -126,3 +201,12 @@ func isValidChunkID(chunkID string) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func syncDir(dirPath string) error {
|
||||
dir, err := os.Open(dirPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dir.Close()
|
||||
return dir.Sync()
|
||||
}
|
||||
|
||||
@@ -5,31 +5,36 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DataPath string
|
||||
Address string
|
||||
Port int
|
||||
ChunkSize int
|
||||
LogLevel string
|
||||
LogFormat string
|
||||
AuditLog bool
|
||||
DataPath string
|
||||
Address string
|
||||
Port int
|
||||
ChunkSize int
|
||||
LogLevel string
|
||||
LogFormat string
|
||||
AuditLog bool
|
||||
GcInterval time.Duration
|
||||
GcEnabled bool
|
||||
}
|
||||
|
||||
func NewConfig() *Config {
|
||||
_ = godotenv.Load()
|
||||
|
||||
config := &Config{
|
||||
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
|
||||
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
|
||||
Port: envIntRange("PORT", 3000, 1, 65535),
|
||||
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
|
||||
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
|
||||
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
|
||||
AuditLog: envBool("AUDIT_LOG", true),
|
||||
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
|
||||
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
|
||||
Port: envIntRange("PORT", 3000, 1, 65535),
|
||||
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
|
||||
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
|
||||
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
|
||||
AuditLog: envBool("AUDIT_LOG", true),
|
||||
GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, -1, 60)) * time.Minute,
|
||||
GcEnabled: envBool("GC_ENABLED", true),
|
||||
}
|
||||
|
||||
if config.LogFormat != "json" && config.LogFormat != "text" {
|
||||
|
||||
Reference in New Issue
Block a user