mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 00:56:25 +00:00
Initial Multipart Upload
This commit is contained in:
46
api/api.go
46
api/api.go
@@ -1,9 +1,11 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"fs/metadata"
|
||||
"fs/models"
|
||||
"fs/service"
|
||||
"fs/utils"
|
||||
"io"
|
||||
@@ -47,6 +49,7 @@ func (h *Handler) setupRoutes() {
|
||||
|
||||
h.router.Get("/{bucket}/*", h.handleGetObject)
|
||||
h.router.Put("/{bucket}/*", h.handlePutObject)
|
||||
h.router.Post("/{bucket}/*", h.handlePostObject)
|
||||
h.router.Head("/{bucket}/*", h.handleHeadObject)
|
||||
h.router.Delete("/{bucket}/*", h.handleDeleteObject)
|
||||
}
|
||||
@@ -68,10 +71,6 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if r.URL.Query().Get("uploadId") != "" {
|
||||
|
||||
}
|
||||
|
||||
stream, manifest, err := h.svc.GetObject(bucket, key)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
@@ -88,6 +87,41 @@ func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if key == "" {
|
||||
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := r.URL.Query()["uploads"]; ok {
|
||||
upload, err := h.svc.CreateMultipartUpload(bucket, key)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
response := models.InitiateMultipartUploadResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Bucket: upload.Bucket,
|
||||
Key: upload.Key,
|
||||
UploadID: upload.UploadID,
|
||||
}
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
return
|
||||
}
|
||||
|
||||
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
@@ -95,6 +129,10 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||
writeS3Error(w, r, s3ErrInvalidObjectKey, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if r.URL.Query().Get("uploads") != "" {
|
||||
if r.URL.Query().Get("partNumber") != "" {
|
||||
}
|
||||
}
|
||||
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
if contentType == "" {
|
||||
|
||||
1
go.mod
1
go.mod
@@ -4,6 +4,7 @@ go 1.25.7
|
||||
|
||||
require (
|
||||
github.com/go-chi/chi/v5 v5.2.5 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
github.com/klauspost/reedsolomon v1.13.2 // indirect
|
||||
go.etcd.io/bbolt v1.4.3 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1,5 +1,7 @@
|
||||
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
|
||||
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
@@ -17,6 +19,7 @@ type MetadataHandler struct {
|
||||
}
|
||||
|
||||
var systemIndex = []byte("__SYSTEM_BUCKETS__")
|
||||
var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__")
|
||||
|
||||
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]{3,63}$`)
|
||||
|
||||
@@ -43,6 +46,14 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
@@ -265,3 +276,52 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||
var upload *models.MultipartUpload
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
if systemIndexBucket.Get([]byte(bucket)) != nil {
|
||||
return nil
|
||||
}
|
||||
return ErrBucketNotFound
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
uploadId := uuid.New().String()
|
||||
createdAt := time.Now().UTC().Format(time.RFC3339)
|
||||
upload = &models.MultipartUpload{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
UploadID: uploadId,
|
||||
CreatedAt: createdAt,
|
||||
State: "pending",
|
||||
}
|
||||
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
|
||||
if multipartUploadBucket == nil {
|
||||
return errors.New("multipart upload index not found")
|
||||
}
|
||||
payload, err := json.Marshal(upload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = multipartUploadBucket.Put([]byte(uploadId), payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return upload, nil
|
||||
}
|
||||
|
||||
@@ -58,3 +58,19 @@ type Contents struct {
|
||||
type CommonPrefixes struct {
|
||||
Prefix string `xml:"Prefix"`
|
||||
}
|
||||
|
||||
type MultipartUpload struct {
|
||||
UploadID string `json:"upload_id" xml:"UploadId"`
|
||||
Bucket string `json:"bucket" xml:"Bucket"`
|
||||
Key string `json:"key" xml:"Key"`
|
||||
CreatedAt string `json:"created_at" xml:"CreatedAt"`
|
||||
State string `json:"state" xml:"State"`
|
||||
}
|
||||
|
||||
type InitiateMultipartUploadResult struct {
|
||||
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
|
||||
@@ -97,3 +97,11 @@ func (s *ObjectService) DeleteBucket(bucket string) error {
|
||||
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||
return s.metadataHandler.ListBuckets()
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||
return s.metadataHandler.CreateMultipartUpload(bucket, key)
|
||||
}
|
||||
|
||||
func (s *ObjectService) PutMultipartObject(bucket, key, uploadId string, input io.Reader) (*models.MultipartUpload, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user