mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 18:06:25 +00:00
Compare commits
6 Commits
master
...
65a7a7eef8
| Author | SHA1 | Date | |
|---|---|---|---|
| 65a7a7eef8 | |||
| eb798be550 | |||
| b19c24d9b7 | |||
| 6fe5a8a629 | |||
| 151c11a636 | |||
| f151f8055a |
239
api/api.go
239
api/api.go
@@ -1 +1,240 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"fs/metadata"
|
||||||
|
"fs/service"
|
||||||
|
"fs/utils"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Handler struct {
|
||||||
|
router *chi.Mux
|
||||||
|
svc *service.ObjectService
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHandler(svc *service.ObjectService) *Handler {
|
||||||
|
r := chi.NewRouter()
|
||||||
|
r.Use(middleware.Recoverer)
|
||||||
|
|
||||||
|
h := &Handler{
|
||||||
|
router: r,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) setupRoutes() {
|
||||||
|
h.router.Use(middleware.Logger)
|
||||||
|
|
||||||
|
h.router.Get("/", h.handleGetBuckets)
|
||||||
|
|
||||||
|
h.router.Get("/{bucket}/", h.handleGetBucket)
|
||||||
|
h.router.Get("/{bucket}", h.handleGetBucket)
|
||||||
|
h.router.Put("/{bucket}", h.handlePutBucket)
|
||||||
|
h.router.Put("/{bucket}/", h.handlePutBucket)
|
||||||
|
h.router.Delete("/{bucket}", h.handleDeleteBucket)
|
||||||
|
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
|
||||||
|
h.router.Head("/{bucket}", h.handleHeadBucket)
|
||||||
|
h.router.Head("/{bucket}/", h.handleHeadBucket)
|
||||||
|
|
||||||
|
h.router.Get("/{bucket}/*", h.handleGetObject)
|
||||||
|
h.router.Put("/{bucket}/*", h.handlePutObject)
|
||||||
|
h.router.Head("/{bucket}/*", h.handleHeadObject)
|
||||||
|
h.router.Delete("/{bucket}/*", h.handleDeleteObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, err := w.Write([]byte("Welcome to the Object Storage API!"))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
key := chi.URLParam(r, "*")
|
||||||
|
|
||||||
|
if key == "" {
|
||||||
|
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.URL.Query().Get("uploadId") != "" {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, manifest, err := h.svc.GetObject(bucket, key)
|
||||||
|
if err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", manifest.ContentType)
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
|
||||||
|
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||||
|
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
|
||||||
|
w.Header().Set("Accept-Ranges", "bytes")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, err = io.Copy(w, stream)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
key := chi.URLParam(r, "*")
|
||||||
|
if key == "" {
|
||||||
|
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentType := r.Header.Get("Content-Type")
|
||||||
|
if contentType == "" {
|
||||||
|
contentType = "application/octet-stream"
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body)
|
||||||
|
defer r.Body.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||||
|
w.Header().Set("Content-Length", "0")
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
key := chi.URLParam(r, "*")
|
||||||
|
if key == "" {
|
||||||
|
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest, err := h.svc.HeadObject(bucket, key)
|
||||||
|
if err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||||
|
w.Header().Set("Content-Length", "0")
|
||||||
|
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handlePutBucket(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
if err := h.svc.CreateBucket(bucket); err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
if err := h.svc.DeleteBucket(bucket); err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
key := chi.URLParam(r, "*")
|
||||||
|
if key == "" {
|
||||||
|
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := h.svc.DeleteObject(bucket, key)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, metadata.ErrObjectNotFound) {
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
if err := h.svc.HeadBucket(bucket); err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
|
||||||
|
buckets, err := h.svc.ListBuckets()
|
||||||
|
if err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/xml")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
for _, bucket := range buckets {
|
||||||
|
w.Write([]byte(bucket))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bucket := chi.URLParam(r, "bucket")
|
||||||
|
|
||||||
|
if r.URL.Query().Get("list-type") == "2" {
|
||||||
|
prefix := r.URL.Query().Get("prefix")
|
||||||
|
if prefix == "" {
|
||||||
|
prefix = ""
|
||||||
|
}
|
||||||
|
h.handleListObjectsV2(w, r, bucket, prefix)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket, prefix string) {
|
||||||
|
objects, err := h.svc.ListObjects(bucket, prefix)
|
||||||
|
if err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects)
|
||||||
|
if err != nil {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||||
|
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, err = w.Write([]byte(xmlResponse))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) Start(address string) error {
|
||||||
|
fmt.Printf("Starting API server on %s\n", address)
|
||||||
|
h.setupRoutes()
|
||||||
|
return http.ListenAndServe(address, h.router)
|
||||||
|
}
|
||||||
|
|||||||
97
api/s3_errors.go
Normal file
97
api/s3_errors.go
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"errors"
|
||||||
|
"fs/metadata"
|
||||||
|
"fs/models"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type s3APIError struct {
|
||||||
|
Status int
|
||||||
|
Code string
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
s3ErrInvalidObjectKey = s3APIError{
|
||||||
|
Status: http.StatusBadRequest,
|
||||||
|
Code: "InvalidArgument",
|
||||||
|
Message: "Object key is required.",
|
||||||
|
}
|
||||||
|
s3ErrNotImplemented = s3APIError{
|
||||||
|
Status: http.StatusNotImplemented,
|
||||||
|
Code: "NotImplemented",
|
||||||
|
Message: "A header you provided implies functionality that is not implemented.",
|
||||||
|
}
|
||||||
|
s3ErrInternal = s3APIError{
|
||||||
|
Status: http.StatusInternalServerError,
|
||||||
|
Code: "InternalError",
|
||||||
|
Message: "We encountered an internal error. Please try again.",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func mapToS3Error(err error) s3APIError {
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, metadata.ErrInvalidBucketName):
|
||||||
|
return s3APIError{
|
||||||
|
Status: http.StatusBadRequest,
|
||||||
|
Code: "InvalidBucketName",
|
||||||
|
Message: "The specified bucket is not valid.",
|
||||||
|
}
|
||||||
|
case errors.Is(err, metadata.ErrBucketAlreadyExists):
|
||||||
|
return s3APIError{
|
||||||
|
Status: http.StatusConflict,
|
||||||
|
Code: "BucketAlreadyOwnedByYou",
|
||||||
|
Message: "Your previous request to create the named bucket succeeded and you already own it.",
|
||||||
|
}
|
||||||
|
case errors.Is(err, metadata.ErrBucketNotFound):
|
||||||
|
return s3APIError{
|
||||||
|
Status: http.StatusNotFound,
|
||||||
|
Code: "NoSuchBucket",
|
||||||
|
Message: "The specified bucket does not exist.",
|
||||||
|
}
|
||||||
|
case errors.Is(err, metadata.ErrBucketNotEmpty):
|
||||||
|
return s3APIError{
|
||||||
|
Status: http.StatusConflict,
|
||||||
|
Code: "BucketNotEmpty",
|
||||||
|
Message: "The bucket you tried to delete is not empty.",
|
||||||
|
}
|
||||||
|
case errors.Is(err, metadata.ErrObjectNotFound):
|
||||||
|
return s3APIError{
|
||||||
|
Status: http.StatusNotFound,
|
||||||
|
Code: "NoSuchKey",
|
||||||
|
Message: "The specified key does not exist.",
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return s3ErrInternal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) {
|
||||||
|
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||||
|
w.WriteHeader(apiErr.Status)
|
||||||
|
|
||||||
|
if r != nil && r.Method == http.MethodHead {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := models.S3ErrorResponse{
|
||||||
|
Code: apiErr.Code,
|
||||||
|
Message: apiErr.Message,
|
||||||
|
Resource: resource,
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := xml.MarshalIndent(payload, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _ = w.Write([]byte(xml.Header))
|
||||||
|
_, _ = w.Write(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeMappedS3Error(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
|
writeS3Error(w, r, mapToS3Error(err), r.URL.Path)
|
||||||
|
}
|
||||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module fs
|
|||||||
go 1.25.7
|
go 1.25.7
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/go-chi/chi/v5 v5.2.5 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||||
github.com/klauspost/reedsolomon v1.13.2 // indirect
|
github.com/klauspost/reedsolomon v1.13.2 // indirect
|
||||||
go.etcd.io/bbolt v1.4.3 // indirect
|
go.etcd.io/bbolt v1.4.3 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -1,3 +1,5 @@
|
|||||||
|
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
|
||||||
|
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||||
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
|
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
|
||||||
|
|||||||
36
main.go
36
main.go
@@ -2,20 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"fs/api"
|
||||||
"fs/metadata"
|
"fs/metadata"
|
||||||
"fs/service"
|
"fs/service"
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
fmt.Println("Hello, World!")
|
|
||||||
imageStream, err := os.Open("fer.jpg")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error opening image stream: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer imageStream.Close()
|
|
||||||
|
|
||||||
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
|
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -24,31 +16,9 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
objectService := service.NewObjectService(metadataHandler)
|
objectService := service.NewObjectService(metadataHandler)
|
||||||
|
handler := api.NewHandler(objectService)
|
||||||
manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream)
|
err = handler.Start("localhost:3000")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error ingesting stream: %v\n", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Printf("Manifest: %+v\n", manifest)
|
|
||||||
|
|
||||||
objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error retrieving object: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Printf("Retrieved manifest: %+v\n", manifest2)
|
|
||||||
recoveredFile, err := os.Create("recovered_" + manifest2.Key)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error creating file: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer recoveredFile.Close()
|
|
||||||
|
|
||||||
bytesWritten, err := io.Copy(recoveredFile, objectData)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error streaming to recovered file: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,37 +2,178 @@ package metadata
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"fs/models"
|
"fs/models"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ManifestBucketName = "object_manifests"
|
|
||||||
|
|
||||||
type MetadataHandler struct {
|
type MetadataHandler struct {
|
||||||
db *bbolt.DB
|
db *bbolt.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var systemIndex = []byte("__SYSTEM_BUCKETS__")
|
||||||
|
|
||||||
|
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrInvalidBucketName = errors.New("invalid bucket name")
|
||||||
|
ErrBucketAlreadyExists = errors.New("bucket already exists")
|
||||||
|
ErrBucketNotFound = errors.New("bucket not found")
|
||||||
|
ErrBucketNotEmpty = errors.New("bucket not empty")
|
||||||
|
ErrObjectNotFound = errors.New("object not found")
|
||||||
|
)
|
||||||
|
|
||||||
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||||
db, err := bbolt.Open(dbPath, 0600, nil)
|
db, err := bbolt.Open(dbPath, 0600, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &MetadataHandler{db: db}, nil
|
h := &MetadataHandler{db: db}
|
||||||
|
|
||||||
|
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
_, err := tx.CreateBucketIfNotExists(systemIndex)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) CreateBucket(bucketName string) error {
|
||||||
|
if !validBucketName.MatchString(bucketName) {
|
||||||
|
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if indexBucket.Get([]byte(bucketName)) != nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketAlreadyExists, bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
manifest := models.BucketManifest{
|
||||||
|
Name: bucketName,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
data, _ := json.Marshal(manifest)
|
||||||
|
|
||||||
|
return indexBucket.Put([]byte(bucketName), data)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) DeleteBucket(bucketName string) error {
|
||||||
|
if !validBucketName.MatchString(bucketName) {
|
||||||
|
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if indexBucket.Get([]byte(bucketName)) == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
|
||||||
|
}
|
||||||
|
metadataBucket := tx.Bucket([]byte(bucketName))
|
||||||
|
if metadataBucket == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
|
||||||
|
}
|
||||||
|
if k, _ := metadataBucket.Cursor().First(); k != nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
|
||||||
|
}
|
||||||
|
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||||
|
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
|
||||||
|
}
|
||||||
|
if err := indexBucket.Delete([]byte(bucketName)); err != nil {
|
||||||
|
return fmt.Errorf("error deleting bucket %s from system index: %w", bucketName, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) ListBuckets() ([]string, error) {
|
||||||
|
buckets := []string{}
|
||||||
|
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
|
if systemIndexBucket == nil {
|
||||||
|
return errors.New("system index not found")
|
||||||
|
}
|
||||||
|
c := systemIndexBucket.Cursor()
|
||||||
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||||
|
buckets = append(buckets, string(k))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return buckets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
|
||||||
|
var manifest *models.BucketManifest
|
||||||
|
|
||||||
|
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
|
if systemIndexBucket == nil {
|
||||||
|
return errors.New("system index not found")
|
||||||
|
}
|
||||||
|
data := systemIndexBucket.Get([]byte(bucketName))
|
||||||
|
if data == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
|
||||||
|
}
|
||||||
|
err := json.Unmarshal(data, &manifest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
bucket := manifest.Bucket
|
||||||
metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName))
|
key := manifest.Key
|
||||||
if err != nil {
|
|
||||||
|
if _, err := h.GetBucketManifest(bucket); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key)
|
|
||||||
|
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||||
data, err := json.Marshal(manifest)
|
data, err := json.Marshal(manifest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
metadataBucket := tx.Bucket([]byte(bucket))
|
||||||
|
if metadataBucket == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
|
}
|
||||||
return metadataBucket.Put([]byte(key), data)
|
return metadataBucket.Put([]byte(key), data)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -44,16 +185,15 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
|||||||
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
||||||
var manifest *models.ObjectManifest
|
var manifest *models.ObjectManifest
|
||||||
|
|
||||||
h.db.View(func(tx *bbolt.Tx) error {
|
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||||
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
|
metadataBucket := tx.Bucket([]byte(bucket))
|
||||||
if metadataBucket == nil {
|
if metadataBucket == nil {
|
||||||
return fmt.Errorf("bucket %s not found", ManifestBucketName)
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
}
|
}
|
||||||
key := fmt.Sprintf("%s/%s", bucket, key)
|
|
||||||
data := metadataBucket.Get([]byte(key))
|
data := metadataBucket.Get([]byte(key))
|
||||||
if data == nil {
|
if data == nil {
|
||||||
|
|
||||||
return fmt.Errorf("manifest not found for bucket %s and key %s", bucket, key)
|
return fmt.Errorf("%w: %s/%s", ErrObjectNotFound, bucket, key)
|
||||||
}
|
}
|
||||||
err := json.Unmarshal(data, &manifest)
|
err := json.Unmarshal(data, &manifest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -61,6 +201,67 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return manifest, nil
|
return manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||||
|
|
||||||
|
var objects []*models.ObjectManifest
|
||||||
|
|
||||||
|
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
|
if systemIndexBucket == nil {
|
||||||
|
return errors.New("system index not found")
|
||||||
|
}
|
||||||
|
if systemIndexBucket.Get([]byte(bucket)) == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
|
}
|
||||||
|
_bucket := tx.Bucket([]byte(bucket))
|
||||||
|
if _bucket == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
|
}
|
||||||
|
err := _bucket.ForEach(func(k, v []byte) error {
|
||||||
|
if prefix != "" && !strings.HasPrefix(string(k), prefix) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
object := models.ObjectManifest{}
|
||||||
|
err := json.Unmarshal(v, &object)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
objects = append(objects, &object)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
|
||||||
|
if _, err := h.GetManifest(bucket, key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
metadataBucket := tx.Bucket([]byte(bucket))
|
||||||
|
if metadataBucket == nil {
|
||||||
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
|
}
|
||||||
|
return metadataBucket.Delete([]byte(key))
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,5 +1,10 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type ObjectManifest struct {
|
type ObjectManifest struct {
|
||||||
Bucket string `json:"bucket"`
|
Bucket string `json:"bucket"`
|
||||||
Key string `json:"key"`
|
Key string `json:"key"`
|
||||||
@@ -9,3 +14,47 @@ type ObjectManifest struct {
|
|||||||
Chunks []string `json:"chunks"`
|
Chunks []string `json:"chunks"`
|
||||||
CreatedAt int64 `json:"created_at"`
|
CreatedAt int64 `json:"created_at"`
|
||||||
}
|
}
|
||||||
|
type BucketManifest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
OwnerID string `json:"owner_id"`
|
||||||
|
OwnerDisplayName string `json:"owner_display_name"`
|
||||||
|
Region string `json:"region"`
|
||||||
|
VersioningStatus string `json:"versioning_status"`
|
||||||
|
PublicAccessBlock bool `json:"public_access_block"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type S3ErrorResponse struct {
|
||||||
|
XMLName xml.Name `xml:"Error"`
|
||||||
|
Code string `xml:"Code"`
|
||||||
|
Message string `xml:"Message"`
|
||||||
|
Resource string `xml:"Resource,omitempty"`
|
||||||
|
RequestID string `xml:"RequestId,omitempty"`
|
||||||
|
HostID string `xml:"HostId,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListBucketResult struct {
|
||||||
|
XMLName xml.Name `xml:"ListBucketResult"`
|
||||||
|
Xmlns string `xml:"xmlns,attr"`
|
||||||
|
|
||||||
|
Name string `xml:"Name"`
|
||||||
|
Prefix string `xml:"Prefix"`
|
||||||
|
KeyCount int `xml:"KeyCount"`
|
||||||
|
MaxKeys int `xml:"MaxKeys"`
|
||||||
|
IsTruncated bool `xml:"IsTruncated"`
|
||||||
|
|
||||||
|
Contents []Contents `xml:"Contents"`
|
||||||
|
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Contents struct {
|
||||||
|
Key string `xml:"Key"`
|
||||||
|
LastModified string `xml:"LastModified"`
|
||||||
|
ETag string `xml:"ETag"`
|
||||||
|
Size int64 `xml:"Size"`
|
||||||
|
StorageClass string `xml:"StorageClass"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CommonPrefixes struct {
|
||||||
|
Prefix string `xml:"Prefix"`
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"fs/models"
|
"fs/models"
|
||||||
"fs/storage"
|
"fs/storage"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,10 +17,7 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService
|
|||||||
return &ObjectService{metadataHandler: metadataHandler}
|
return &ObjectService{metadataHandler: metadataHandler}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||||
|
|
||||||
bucket := strings.Split(uri, "/")[0]
|
|
||||||
key := strings.Join(strings.Split(uri, "/")[1:], "/")
|
|
||||||
|
|
||||||
chunks, size, etag, err := storage.IngestStream(input)
|
chunks, size, etag, err := storage.IngestStream(input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -54,7 +50,12 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
|||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer pw.Close()
|
defer func(pw *io.PipeWriter) {
|
||||||
|
err := pw.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(pw)
|
||||||
|
|
||||||
err := storage.AssembleStream(manifest.Chunks, pw)
|
err := storage.AssembleStream(manifest.Chunks, pw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -63,3 +64,36 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
|||||||
}()
|
}()
|
||||||
return pr, manifest, nil
|
return pr, manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
||||||
|
manifest, err := s.metadataHandler.GetManifest(bucket, key)
|
||||||
|
if err != nil {
|
||||||
|
return models.ObjectManifest{}, err
|
||||||
|
}
|
||||||
|
return *manifest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) DeleteObject(bucket, key string) error {
|
||||||
|
return s.metadataHandler.DeleteManifest(bucket, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||||
|
return s.metadataHandler.ListObjects(bucket, prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) CreateBucket(bucket string) error {
|
||||||
|
return s.metadataHandler.CreateBucket(bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||||
|
_, err := s.metadataHandler.GetBucketManifest(bucket)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) DeleteBucket(bucket string) error {
|
||||||
|
return s.metadataHandler.DeleteBucket(bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||||
|
return s.metadataHandler.ListBuckets()
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -21,7 +22,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
bytesRead, err := io.ReadFull(stream, buffer)
|
bytesRead, err := io.ReadFull(stream, buffer)
|
||||||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
return nil, 0, "", err
|
return nil, 0, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,7 +41,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
|||||||
}
|
}
|
||||||
chunkIDs = append(chunkIDs, chunkID)
|
chunkIDs = append(chunkIDs, chunkID)
|
||||||
}
|
}
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
54
utils/utils.go
Normal file
54
utils/utils.go
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"fs/models"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ConstructXMLResponseForObjectList(bucket string, objects []*models.ObjectManifest) (string, error) {
|
||||||
|
result := models.ListBucketResult{
|
||||||
|
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||||
|
Name: bucket,
|
||||||
|
Prefix: "",
|
||||||
|
KeyCount: len(objects),
|
||||||
|
MaxKeys: 1000,
|
||||||
|
IsTruncated: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
prefixSet := make(map[string]struct{})
|
||||||
|
|
||||||
|
for _, object := range objects {
|
||||||
|
result.Contents = append(result.Contents, models.Contents{
|
||||||
|
Key: object.Key,
|
||||||
|
LastModified: time.Unix(object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||||
|
ETag: "\"" + object.ETag + "\"",
|
||||||
|
Size: object.Size,
|
||||||
|
StorageClass: "STANDARD",
|
||||||
|
})
|
||||||
|
|
||||||
|
if strings.Contains(object.Key, "/") {
|
||||||
|
parts := strings.SplitN(object.Key, "/", 2)
|
||||||
|
prefixSet[parts[0]+"/"] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
prefixes := make([]string, 0, len(prefixSet))
|
||||||
|
for prefix := range prefixSet {
|
||||||
|
prefixes = append(prefixes, prefix)
|
||||||
|
}
|
||||||
|
sort.Strings(prefixes)
|
||||||
|
|
||||||
|
for _, prefix := range prefixes {
|
||||||
|
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{Prefix: prefix})
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := xml.MarshalIndent(result, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return xml.Header + string(output), nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user