mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 07:46:25 +00:00
Working simple PUT/GET API
This commit is contained in:
95
api/api.go
95
api/api.go
@@ -1 +1,96 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"fs/service"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Handler struct {
|
||||||
|
router *chi.Mux
|
||||||
|
svc *service.ObjectService
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHandler(svc *service.ObjectService) *Handler {
|
||||||
|
r := chi.NewRouter()
|
||||||
|
r.Use(middleware.Recoverer)
|
||||||
|
|
||||||
|
h := &Handler{
|
||||||
|
router: r,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) setupRoutes() {
|
||||||
|
h.router.Use(middleware.Logger)
|
||||||
|
h.router.Get("/", h.handleWelcome)
|
||||||
|
h.router.Get("/*", h.handleGetObject)
|
||||||
|
h.router.Put("/*", h.handlePutObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleWelcome(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, err := w.Write([]byte("Welcome to the Object Storage API!"))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
urlParams := chi.URLParam(r, "*")
|
||||||
|
bucket := strings.Split(urlParams, "/")[0]
|
||||||
|
key := strings.Join(strings.Split(urlParams, "/")[1:], "/")
|
||||||
|
|
||||||
|
stream, manifest, err := h.svc.GetObject(bucket, key)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", manifest.ContentType)
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
|
||||||
|
w.Header().Set("ETag", manifest.ETag)
|
||||||
|
w.Header().Set("Accept-Ranges", "bytes")
|
||||||
|
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(time.RFC1123))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, err = io.Copy(w, stream)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
urlParams := chi.URLParam(r, "*")
|
||||||
|
bucket := strings.Split(urlParams, "/")[0]
|
||||||
|
key := strings.Join(strings.Split(urlParams, "/")[1:], "/")
|
||||||
|
|
||||||
|
contentType := r.Header.Get("Content-Type")
|
||||||
|
if contentType == "" {
|
||||||
|
contentType = "application/octet-stream"
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest, err := h.svc.PutObject(bucket, key, contentType, r.Body)
|
||||||
|
defer r.Body.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("ETag", manifest.ETag)
|
||||||
|
w.Header().Set("Content-Length", "0")
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) Start(address string) error {
|
||||||
|
fmt.Printf("Starting API server on %s\n", address)
|
||||||
|
h.setupRoutes()
|
||||||
|
return http.ListenAndServe(address, h.router)
|
||||||
|
}
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module fs
|
|||||||
go 1.25.7
|
go 1.25.7
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/go-chi/chi/v5 v5.2.5 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||||
github.com/klauspost/reedsolomon v1.13.2 // indirect
|
github.com/klauspost/reedsolomon v1.13.2 // indirect
|
||||||
go.etcd.io/bbolt v1.4.3 // indirect
|
go.etcd.io/bbolt v1.4.3 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -1,3 +1,5 @@
|
|||||||
|
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
|
||||||
|
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||||
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
|
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
|
||||||
|
|||||||
36
main.go
36
main.go
@@ -2,20 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"fs/api"
|
||||||
"fs/metadata"
|
"fs/metadata"
|
||||||
"fs/service"
|
"fs/service"
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
fmt.Println("Hello, World!")
|
|
||||||
imageStream, err := os.Open("fer.jpg")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error opening image stream: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer imageStream.Close()
|
|
||||||
|
|
||||||
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
|
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -24,31 +16,9 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
objectService := service.NewObjectService(metadataHandler)
|
objectService := service.NewObjectService(metadataHandler)
|
||||||
|
handler := api.NewHandler(objectService)
|
||||||
manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream)
|
err = handler.Start("localhost:3000")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error ingesting stream: %v\n", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Printf("Manifest: %+v\n", manifest)
|
|
||||||
|
|
||||||
objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error retrieving object: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Printf("Retrieved manifest: %+v\n", manifest2)
|
|
||||||
recoveredFile, err := os.Create("recovered_" + manifest2.Key)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error creating file: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer recoveredFile.Close()
|
|
||||||
|
|
||||||
bytesWritten, err := io.Copy(recoveredFile, objectData)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error streaming to recovered file: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
|||||||
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
||||||
var manifest *models.ObjectManifest
|
var manifest *models.ObjectManifest
|
||||||
|
|
||||||
h.db.View(func(tx *bbolt.Tx) error {
|
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||||
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
|
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
|
||||||
if metadataBucket == nil {
|
if metadataBucket == nil {
|
||||||
return fmt.Errorf("bucket %s not found", ManifestBucketName)
|
return fmt.Errorf("bucket %s not found", ManifestBucketName)
|
||||||
@@ -61,6 +61,9 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return manifest, nil
|
return manifest, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"fs/models"
|
"fs/models"
|
||||||
"fs/storage"
|
"fs/storage"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,10 +17,7 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService
|
|||||||
return &ObjectService{metadataHandler: metadataHandler}
|
return &ObjectService{metadataHandler: metadataHandler}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||||
|
|
||||||
bucket := strings.Split(uri, "/")[0]
|
|
||||||
key := strings.Join(strings.Split(uri, "/")[1:], "/")
|
|
||||||
|
|
||||||
chunks, size, etag, err := storage.IngestStream(input)
|
chunks, size, etag, err := storage.IngestStream(input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -54,7 +50,12 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
|||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer pw.Close()
|
defer func(pw *io.PipeWriter) {
|
||||||
|
err := pw.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(pw)
|
||||||
|
|
||||||
err := storage.AssembleStream(manifest.Chunks, pw)
|
err := storage.AssembleStream(manifest.Chunks, pw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -21,7 +22,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
bytesRead, err := io.ReadFull(stream, buffer)
|
bytesRead, err := io.ReadFull(stream, buffer)
|
||||||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
return nil, 0, "", err
|
return nil, 0, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,7 +41,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
|||||||
}
|
}
|
||||||
chunkIDs = append(chunkIDs, chunkID)
|
chunkIDs = append(chunkIDs, chunkID)
|
||||||
}
|
}
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -48,7 +49,7 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
||||||
return chunkIDs, totalSize, etag, nil
|
return chunkIDs, totalSize, etag, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user