mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-04 20:36:25 +00:00
Added Bucket routes and bucket logic
This commit is contained in:
127
api/api.go
127
api/api.go
@@ -3,10 +3,10 @@ package api
|
||||
import (
|
||||
"fmt"
|
||||
"fs/service"
|
||||
"fs/utils"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -31,10 +31,21 @@ func NewHandler(svc *service.ObjectService) *Handler {
|
||||
|
||||
func (h *Handler) setupRoutes() {
|
||||
h.router.Use(middleware.Logger)
|
||||
h.router.Get("/", h.handleWelcome)
|
||||
h.router.Get("/*", h.handleGetObject)
|
||||
h.router.Put("/*", h.handlePutObject)
|
||||
h.router.Head("/*", h.handleHeadObject)
|
||||
|
||||
h.router.Get("/", h.handleGetBuckets)
|
||||
|
||||
h.router.Get("/{bucket}/", h.handleGetBucket)
|
||||
h.router.Get("/{bucket}", h.handleGetBucket)
|
||||
h.router.Put("/{bucket}", h.handlePutBucket)
|
||||
h.router.Put("/{bucket}/", h.handlePutBucket)
|
||||
h.router.Delete("/{bucket}", h.handleDeleteBucket)
|
||||
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
|
||||
h.router.Head("/{bucket}", h.handleHeadBucket)
|
||||
h.router.Head("/{bucket}/", h.handleHeadBucket)
|
||||
|
||||
h.router.Get("/{bucket}/*", h.handleGetObject)
|
||||
h.router.Put("/{bucket}/*", h.handlePutObject)
|
||||
h.router.Head("/{bucket}/*", h.handleHeadObject)
|
||||
}
|
||||
|
||||
func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -46,9 +57,12 @@ func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||
urlParams := chi.URLParam(r, "*")
|
||||
bucket := strings.Split(urlParams, "/")[0]
|
||||
key := strings.Join(strings.Split(urlParams, "/")[1:], "/")
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if key == "" {
|
||||
http.Error(w, "object key is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
stream, manifest, err := h.svc.GetObject(bucket, key)
|
||||
if err != nil {
|
||||
@@ -67,9 +81,12 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||
urlParams := chi.URLParam(r, "*")
|
||||
bucket := strings.Split(urlParams, "/")[0]
|
||||
key := strings.Join(strings.Split(urlParams, "/")[1:], "/")
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if key == "" {
|
||||
http.Error(w, "object key is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
if contentType == "" {
|
||||
@@ -91,9 +108,12 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
|
||||
urlParams := chi.URLParam(r, "*")
|
||||
bucket := strings.Split(urlParams, "/")[0]
|
||||
key := strings.Join(strings.Split(urlParams, "/")[1:], "/")
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if key == "" {
|
||||
http.Error(w, "object key is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
manifest, err := h.svc.HeadObject(bucket, key)
|
||||
if err != nil {
|
||||
@@ -107,6 +127,85 @@ func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if h.svc.CreateBucket(bucket) != nil {
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if h.svc.DeleteBucket(bucket) != nil {
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
func (h *Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if h.svc.HeadBucket(bucket) != nil {
|
||||
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
|
||||
buckets, err := h.svc.ListBuckets()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/x-yaml")
|
||||
w.Header().Set("Content-Length", "0")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
for _, bucket := range buckets {
|
||||
w.Write([]byte(bucket))
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
|
||||
if r.URL.Query().Get("list-type") == "2" {
|
||||
h.handleListObjectsV2(w, r, bucket)
|
||||
return
|
||||
}
|
||||
|
||||
if r.URL.Query().Has("location") {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
|
||||
objects, err := h.svc.ListObjects(bucket, "")
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
xmlResponse, err := utils.ConstructXMLResponseForObjectList(bucket, objects)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, err = w.Write([]byte(xmlResponse))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) Start(address string) error {
|
||||
fmt.Printf("Starting API server on %s\n", address)
|
||||
h.setupRoutes()
|
||||
|
||||
@@ -2,37 +2,162 @@ package metadata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"fs/models"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const ManifestBucketName = "object_manifests"
|
||||
|
||||
type MetadataHandler struct {
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
var systemIndex = []byte("__SYSTEM_BUCKETS__")
|
||||
|
||||
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`)
|
||||
|
||||
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||
db, err := bbolt.Open(dbPath, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MetadataHandler{db: db}, nil
|
||||
h := &MetadataHandler{db: db}
|
||||
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(systemIndex)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||
func (h *MetadataHandler) CreateBucket(bucketName string) error {
|
||||
if !validBucketName.MatchString(bucketName) {
|
||||
return fmt.Errorf("invalid bucket name: %s", bucketName)
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName))
|
||||
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key)
|
||||
if indexBucket.Get([]byte(bucketName)) != nil {
|
||||
return fmt.Errorf("bucket %s already exists", bucketName)
|
||||
}
|
||||
|
||||
_, err = tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
manifest := models.BucketManifest{
|
||||
Name: bucketName,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
data, _ := json.Marshal(manifest)
|
||||
|
||||
return indexBucket.Put([]byte(bucketName), data)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) DeleteBucket(bucketName string) error {
|
||||
if !validBucketName.MatchString(bucketName) {
|
||||
return fmt.Errorf("invalid bucket name: %s", bucketName)
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if indexBucket.Get([]byte(bucketName)) == nil {
|
||||
return fmt.Errorf("bucket %s not found", bucketName)
|
||||
}
|
||||
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
|
||||
}
|
||||
if err := indexBucket.Delete([]byte(bucketName)); err != nil {
|
||||
return fmt.Errorf("error deleting bucket %s from system index: %w", bucketName, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) ListBuckets() ([]string, error) {
|
||||
buckets := []string{}
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
c := systemIndexBucket.Cursor()
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
buckets = append(buckets, string(k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
|
||||
var manifest *models.BucketManifest
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
data := systemIndexBucket.Get([]byte(bucketName))
|
||||
if data == nil {
|
||||
return fmt.Errorf("bucket manifest not found for bucket %s", bucketName)
|
||||
}
|
||||
err := json.Unmarshal(data, &manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||
bucket := manifest.Bucket
|
||||
key := manifest.Key
|
||||
|
||||
if _, err := h.GetBucketManifest(bucket); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
data, err := json.Marshal(manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("metadata bucket %s not found; create it first", bucket)
|
||||
}
|
||||
return metadataBucket.Put([]byte(key), data)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -45,11 +170,10 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
|
||||
var manifest *models.ObjectManifest
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("bucket %s not found", ManifestBucketName)
|
||||
return fmt.Errorf("bucket %s not found", bucket)
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s", bucket, key)
|
||||
data := metadataBucket.Get([]byte(key))
|
||||
if data == nil {
|
||||
|
||||
@@ -67,3 +191,39 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||
|
||||
var objects []*models.ObjectManifest
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
if systemIndexBucket.Get([]byte(bucket)) == nil {
|
||||
return fmt.Errorf("bucket %s not found", bucket)
|
||||
}
|
||||
_bucket := tx.Bucket([]byte(bucket))
|
||||
if _bucket == nil {
|
||||
return fmt.Errorf("bucket %s not found", bucket)
|
||||
}
|
||||
err := _bucket.ForEach(func(k, v []byte) error {
|
||||
object := models.ObjectManifest{}
|
||||
err := json.Unmarshal(v, &object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objects = append(objects, &object)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ObjectManifest struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Key string `json:"key"`
|
||||
@@ -9,3 +14,38 @@ type ObjectManifest struct {
|
||||
Chunks []string `json:"chunks"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
}
|
||||
type BucketManifest struct {
|
||||
Name string `json:"name"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
OwnerID string `json:"owner_id"`
|
||||
OwnerDisplayName string `json:"owner_display_name"`
|
||||
Region string `json:"region"`
|
||||
VersioningStatus string `json:"versioning_status"`
|
||||
PublicAccessBlock bool `json:"public_access_block"`
|
||||
}
|
||||
|
||||
type ListBucketResult struct {
|
||||
XMLName xml.Name `xml:"ListBucketResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
|
||||
Name string `xml:"Name"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
KeyCount int `xml:"KeyCount"`
|
||||
MaxKeys int `xml:"MaxKeys"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
|
||||
Contents []Contents `xml:"Contents"`
|
||||
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
|
||||
}
|
||||
|
||||
type Contents struct {
|
||||
Key string `xml:"Key"`
|
||||
LastModified string `xml:"LastModified"`
|
||||
ETag string `xml:"ETag"`
|
||||
Size int64 `xml:"Size"`
|
||||
StorageClass string `xml:"StorageClass"`
|
||||
}
|
||||
|
||||
type CommonPrefixes struct {
|
||||
Prefix string `xml:"Prefix"`
|
||||
}
|
||||
|
||||
@@ -72,3 +72,28 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
|
||||
}
|
||||
return *manifest, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteObject(bucket, key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||
return s.metadataHandler.ListObjects(bucket, prefix)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateBucket(bucket string) error {
|
||||
return s.metadataHandler.CreateBucket(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||
_, err := s.metadataHandler.GetBucketManifest(bucket)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteBucket(bucket string) error {
|
||||
return s.metadataHandler.DeleteBucket(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||
return s.metadataHandler.ListBuckets()
|
||||
}
|
||||
|
||||
54
utils/utils.go
Normal file
54
utils/utils.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fs/models"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func ConstructXMLResponseForObjectList(bucket string, objects []*models.ObjectManifest) (string, error) {
|
||||
result := models.ListBucketResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Name: bucket,
|
||||
Prefix: "",
|
||||
KeyCount: len(objects),
|
||||
MaxKeys: 1000,
|
||||
IsTruncated: false,
|
||||
}
|
||||
|
||||
prefixSet := make(map[string]struct{})
|
||||
|
||||
for _, object := range objects {
|
||||
result.Contents = append(result.Contents, models.Contents{
|
||||
Key: object.Key,
|
||||
LastModified: time.Unix(object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
ETag: "\"" + object.ETag + "\"",
|
||||
Size: object.Size,
|
||||
StorageClass: "STANDARD",
|
||||
})
|
||||
|
||||
if strings.Contains(object.Key, "/") {
|
||||
parts := strings.SplitN(object.Key, "/", 2)
|
||||
prefixSet[parts[0]+"/"] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
prefixes := make([]string, 0, len(prefixSet))
|
||||
for prefix := range prefixSet {
|
||||
prefixes = append(prefixes, prefix)
|
||||
}
|
||||
sort.Strings(prefixes)
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{Prefix: prefix})
|
||||
}
|
||||
|
||||
output, err := xml.MarshalIndent(result, "", " ")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return xml.Header + string(output), nil
|
||||
}
|
||||
Reference in New Issue
Block a user