mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 14:06:25 +00:00
Compare commits
27 Commits
master
...
79819ad2d0
| Author | SHA1 | Date | |
|---|---|---|---|
| 79819ad2d0 | |||
| abe1f453fc | |||
| a9fbc06dd0 | |||
| edfb5f5b2a | |||
| c997fe8471 | |||
| fca553028c | |||
| 3630aad584 | |||
| 93296ff74e | |||
| 1b7393a545 | |||
|
|
a3fad34272 | ||
| 06c90be50f | |||
| 5e87247087 | |||
| a4990dae01 | |||
| d9a1bd9001 | |||
| a8204de914 | |||
| d7bdb3177b | |||
| c989037160 | |||
| 5d41ec9e0a | |||
| 111ce5b669 | |||
| 5438a7f4b4 | |||
| 9b5035dfa0 | |||
| 65a7a7eef8 | |||
| eb798be550 | |||
| b19c24d9b7 | |||
| 6fe5a8a629 | |||
| 151c11a636 | |||
| f151f8055a |
4
.dockerignore
Normal file
4
.dockerignore
Normal file
@@ -0,0 +1,4 @@
|
||||
*.md
|
||||
.gocache/
|
||||
blobs/
|
||||
data/
|
||||
9
.env.example
Normal file
9
.env.example
Normal file
@@ -0,0 +1,9 @@
|
||||
LOG_LEVEL=debug
|
||||
LOG_FORMAT=text
|
||||
DATA_PATH=data/
|
||||
PORT=2600
|
||||
AUDIT_LOG=true
|
||||
ADDRESS=0.0.0.0
|
||||
GC_INTERVAL=10
|
||||
GC_ENABLED=true
|
||||
MULTIPART_RETENTION_HOURS=24
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1,5 +1,8 @@
|
||||
.env
|
||||
*.db
|
||||
.vscode/
|
||||
blobs/
|
||||
*.db
|
||||
data/
|
||||
.idea/
|
||||
.gocache/
|
||||
.gomodcache/
|
||||
|
||||
18
Dockerfile
Normal file
18
Dockerfile
Normal file
@@ -0,0 +1,18 @@
|
||||
FROM golang:1.25-alpine AS build
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
COPY . .
|
||||
RUN CGO_ENABLED=0 GOOS=linux go build -o /app/fs .
|
||||
|
||||
FROM alpine:3.23 AS runner
|
||||
|
||||
COPY --from=build /app/fs /app/fs
|
||||
|
||||
WORKDIR /app
|
||||
EXPOSE 2600
|
||||
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 CMD wget -q -O /dev/null "http://127.0.0.1:${PORT:-2600}/healthz" || exit 1
|
||||
CMD ["/app/fs"]
|
||||
8
LICENSE.md
Normal file
8
LICENSE.md
Normal file
@@ -0,0 +1,8 @@
|
||||
Copyright 2025 ferdzo
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
44
README.md
44
README.md
@@ -1,3 +1,45 @@
|
||||
# fs
|
||||
|
||||
An experimental Object Storage written in Go that should be compatible with S3
|
||||
An experimental Object Storage written in Go that should be partially compatible with S3
|
||||
|
||||
## Features
|
||||
|
||||
Bucket operations:
|
||||
- `PUT /{bucket}`
|
||||
- `HEAD /{bucket}`
|
||||
- `DELETE /{bucket}`
|
||||
- `GET /` (list buckets)
|
||||
|
||||
Object operations:
|
||||
- `PUT /{bucket}/{key}`
|
||||
- `GET /{bucket}/{key}`
|
||||
- `HEAD /{bucket}/{key}`
|
||||
- `DELETE /{bucket}/{key}`
|
||||
- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style)
|
||||
|
||||
Multipart upload:
|
||||
- `POST /{bucket}/{key}?uploads` (initiate)
|
||||
- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part)
|
||||
- `GET /{bucket}/{key}?uploadId=...` (list parts)
|
||||
- `POST /{bucket}/{key}?uploadId=...` (complete)
|
||||
- `DELETE /{bucket}/{key}?uploadId=...` (abort)
|
||||
|
||||
Multi-object delete:
|
||||
- `POST /{bucket}?delete` with S3-style XML body
|
||||
|
||||
AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies)
|
||||
|
||||
Health:
|
||||
- `GET /healthz`
|
||||
- `HEAD /healthz`
|
||||
|
||||
## Limitations
|
||||
|
||||
- No authentication/authorization yet.
|
||||
- Not full S3 API coverage.
|
||||
- No versioning or lifecycle policies.
|
||||
- Error and edge-case behavior is still being refined for client compatibility.
|
||||
|
||||
## License
|
||||
|
||||
MIT License
|
||||
|
||||
980
api/api.go
980
api/api.go
@@ -1 +1,981 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"fs/logging"
|
||||
"fs/metadata"
|
||||
"fs/models"
|
||||
"fs/service"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
router *chi.Mux
|
||||
svc *service.ObjectService
|
||||
logger *slog.Logger
|
||||
logConfig logging.Config
|
||||
}
|
||||
|
||||
const (
|
||||
maxXMLBodyBytes int64 = 1 << 20
|
||||
maxDeleteObjects = 1000
|
||||
maxObjectKeyBytes = 1024
|
||||
serverReadHeaderTimeout = 5 * time.Second
|
||||
serverReadTimeout = 60 * time.Second
|
||||
serverWriteTimeout = 120 * time.Second
|
||||
serverIdleTimeout = 120 * time.Second
|
||||
serverMaxHeaderBytes = 1 << 20
|
||||
serverMaxConnections = 1024
|
||||
)
|
||||
|
||||
func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig logging.Config) *Handler {
|
||||
r := chi.NewRouter()
|
||||
r.Use(middleware.RequestID)
|
||||
r.Use(middleware.Recoverer)
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
|
||||
h := &Handler{
|
||||
router: r,
|
||||
svc: svc,
|
||||
logger: logger,
|
||||
logConfig: logConfig,
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *Handler) setupRoutes() {
|
||||
h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig))
|
||||
|
||||
h.router.Get("/healthz", h.handleHealth)
|
||||
h.router.Head("/healthz", h.handleHealth)
|
||||
h.router.Get("/", h.handleGetBuckets)
|
||||
|
||||
h.router.Get("/{bucket}/", h.handleGetBucket)
|
||||
h.router.Get("/{bucket}", h.handleGetBucket)
|
||||
h.router.Put("/{bucket}", h.handlePutBucket)
|
||||
h.router.Put("/{bucket}/", h.handlePutBucket)
|
||||
h.router.Post("/{bucket}", h.handlePostBucket)
|
||||
h.router.Post("/{bucket}/", h.handlePostBucket)
|
||||
h.router.Delete("/{bucket}", h.handleDeleteBucket)
|
||||
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
|
||||
h.router.Head("/{bucket}", h.handleHeadBucket)
|
||||
h.router.Head("/{bucket}/", h.handleHeadBucket)
|
||||
|
||||
h.router.Get("/{bucket}/*", h.handleGetObject)
|
||||
h.router.Put("/{bucket}/*", h.handlePutObject)
|
||||
h.router.Post("/{bucket}/*", h.handlePostObject)
|
||||
h.router.Head("/{bucket}/*", h.handleHeadObject)
|
||||
h.router.Delete("/{bucket}/*", h.handleDeleteObject)
|
||||
}
|
||||
|
||||
func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := h.svc.ListBuckets(); err != nil {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
if r.Method != http.MethodHead {
|
||||
_, _ = w.Write([]byte("unhealthy"))
|
||||
}
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if r.Method != http.MethodHead {
|
||||
_, _ = w.Write([]byte("ok"))
|
||||
}
|
||||
}
|
||||
|
||||
func validateObjectKey(key string) *s3APIError {
|
||||
if key == "" {
|
||||
err := s3ErrInvalidObjectKey
|
||||
return &err
|
||||
}
|
||||
if len(key) > maxObjectKeyBytes {
|
||||
err := s3ErrKeyTooLong
|
||||
return &err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
|
||||
if apiErr := validateObjectKey(key); apiErr != nil {
|
||||
writeS3Error(w, r, *apiErr, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
|
||||
h.handleListMultipartParts(w, r, bucket, key, uploadID)
|
||||
return
|
||||
}
|
||||
|
||||
stream, manifest, err := h.svc.GetObject(bucket, key)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
|
||||
if rangeHeader != "" {
|
||||
start, end, err := parseSingleByteRange(rangeHeader, manifest.Size)
|
||||
if err != nil {
|
||||
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size))
|
||||
writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
if start > 0 {
|
||||
if _, err := io.CopyN(io.Discard, stream, start); err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
length := end - start + 1
|
||||
w.Header().Set("Content-Type", manifest.ContentType)
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
|
||||
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size))
|
||||
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
|
||||
w.Header().Set("Accept-Ranges", "bytes")
|
||||
w.WriteHeader(http.StatusPartialContent)
|
||||
_, _ = io.CopyN(w, stream, length)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", manifest.ContentType)
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
|
||||
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
|
||||
w.Header().Set("Accept-Ranges", "bytes")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, err = io.Copy(w, stream)
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if apiErr := validateObjectKey(key); apiErr != nil {
|
||||
writeS3Error(w, r, *apiErr, r.URL.Path)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if _, ok := r.URL.Query()["uploads"]; ok {
|
||||
upload, err := h.svc.CreateMultipartUpload(bucket, key)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
response := models.InitiateMultipartUploadResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Bucket: upload.Bucket,
|
||||
Key: upload.Key,
|
||||
UploadID: upload.UploadID,
|
||||
}
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
return
|
||||
}
|
||||
|
||||
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
|
||||
var req models.CompleteMultipartUploadRequest
|
||||
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
var maxErr *http.MaxBytesError
|
||||
if errors.As(err, &maxErr) {
|
||||
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
|
||||
return
|
||||
}
|
||||
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
response := models.CompleteMultipartUploadResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
ETag: `"` + manifest.ETag + `"`,
|
||||
Location: r.URL.Path,
|
||||
}
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
return
|
||||
}
|
||||
|
||||
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if apiErr := validateObjectKey(key); apiErr != nil {
|
||||
writeS3Error(w, r, *apiErr, r.URL.Path)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
uploadID := r.URL.Query().Get("uploadId")
|
||||
partNumberRaw := r.URL.Query().Get("partNumber")
|
||||
if uploadID != "" || partNumberRaw != "" {
|
||||
if uploadID == "" || partNumberRaw == "" {
|
||||
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
partNumber, err := strconv.Atoi(partNumberRaw)
|
||||
if err != nil {
|
||||
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if partNumber < 1 || partNumber > 10000 {
|
||||
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
bodyReader := io.Reader(r.Body)
|
||||
var decodeStream io.ReadCloser
|
||||
if shouldDecodeAWSChunkedPayload(r) {
|
||||
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
||||
defer decodeStream.Close()
|
||||
bodyReader = decodeStream
|
||||
}
|
||||
|
||||
etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("ETag", `"`+etag+`"`)
|
||||
w.Header().Set("Content-Length", "0")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
if contentType == "" {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
|
||||
bodyReader := io.Reader(r.Body)
|
||||
var decodeStream io.ReadCloser
|
||||
if shouldDecodeAWSChunkedPayload(r) {
|
||||
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
||||
defer decodeStream.Close()
|
||||
bodyReader = decodeStream
|
||||
}
|
||||
|
||||
manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader)
|
||||
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
|
||||
w.Header().Set("Content-Length", "0")
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) {
|
||||
parts, err := h.svc.ListMultipartParts(bucket, key, uploadID)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
response := models.ListPartsResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
UploadID: uploadID,
|
||||
Parts: make([]models.PartItem, 0, len(parts)),
|
||||
}
|
||||
for _, part := range parts {
|
||||
response.Parts = append(response.Parts, models.PartItem{
|
||||
PartNumber: part.PartNumber,
|
||||
LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
ETag: `"` + part.ETag + `"`,
|
||||
Size: part.Size,
|
||||
})
|
||||
}
|
||||
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
}
|
||||
|
||||
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
|
||||
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
|
||||
if strings.Contains(contentEncoding, "aws-chunked") {
|
||||
return true
|
||||
}
|
||||
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
|
||||
return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload")
|
||||
}
|
||||
|
||||
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
if err := decodeAWSChunkedPayload(src, pw); err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
_ = pw.Close()
|
||||
}()
|
||||
return pr
|
||||
}
|
||||
|
||||
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
|
||||
reader := bufio.NewReader(src)
|
||||
for {
|
||||
headerLine, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
headerLine = strings.TrimRight(headerLine, "\r\n")
|
||||
chunkSizeToken := headerLine
|
||||
if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 {
|
||||
chunkSizeToken = chunkSizeToken[:idx]
|
||||
}
|
||||
chunkSizeToken = strings.TrimSpace(chunkSizeToken)
|
||||
chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err)
|
||||
}
|
||||
if chunkSize < 0 {
|
||||
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
|
||||
}
|
||||
if chunkSize > 0 {
|
||||
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
crlf := make([]byte, 2)
|
||||
if _, err := io.ReadFull(reader, crlf); err != nil {
|
||||
return err
|
||||
}
|
||||
if crlf[0] != '\r' || crlf[1] != '\n' {
|
||||
return errors.New("invalid aws-chunked payload terminator")
|
||||
}
|
||||
|
||||
if chunkSize == 0 {
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if line == "\r\n" || line == "\n" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if err := h.svc.CreateBucket(bucket); err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if err := h.svc.DeleteBucket(bucket); err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if _, ok := r.URL.Query()["delete"]; !ok {
|
||||
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
|
||||
|
||||
bodyReader := io.Reader(r.Body)
|
||||
var decodeStream io.ReadCloser
|
||||
if shouldDecodeAWSChunkedPayload(r) {
|
||||
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
||||
defer decodeStream.Close()
|
||||
bodyReader = decodeStream
|
||||
}
|
||||
|
||||
var req models.DeleteObjectsRequest
|
||||
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
|
||||
var maxErr *http.MaxBytesError
|
||||
if errors.As(err, &maxErr) {
|
||||
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
|
||||
return
|
||||
}
|
||||
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if len(req.Objects) > maxDeleteObjects {
|
||||
writeS3Error(w, r, s3ErrTooManyDeleteObjects, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
keys := make([]string, 0, len(req.Objects))
|
||||
response := models.DeleteObjectsResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
}
|
||||
for _, obj := range req.Objects {
|
||||
if obj.Key == "" {
|
||||
response.Errors = append(response.Errors, models.DeleteError{
|
||||
Key: obj.Key,
|
||||
Code: s3ErrInvalidObjectKey.Code,
|
||||
Message: s3ErrInvalidObjectKey.Message,
|
||||
})
|
||||
continue
|
||||
}
|
||||
if len(obj.Key) > maxObjectKeyBytes {
|
||||
response.Errors = append(response.Errors, models.DeleteError{
|
||||
Key: obj.Key,
|
||||
Code: s3ErrKeyTooLong.Code,
|
||||
Message: s3ErrKeyTooLong.Message,
|
||||
})
|
||||
continue
|
||||
}
|
||||
keys = append(keys, obj.Key)
|
||||
}
|
||||
|
||||
deleted, err := h.svc.DeleteObjects(bucket, keys)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !req.Quiet {
|
||||
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
|
||||
for _, key := range deleted {
|
||||
response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key})
|
||||
}
|
||||
}
|
||||
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
}
|
||||
|
||||
func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if apiErr := validateObjectKey(key); apiErr != nil {
|
||||
writeS3Error(w, r, *apiErr, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" {
|
||||
err := h.svc.AbortMultipartUpload(bucket, key, uploadId)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
err := h.svc.DeleteObject(bucket, key)
|
||||
if err != nil {
|
||||
if errors.Is(err, metadata.ErrObjectNotFound) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (h *Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
if err := h.svc.HeadBucket(bucket); err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
key := chi.URLParam(r, "*")
|
||||
if apiErr := validateObjectKey(key); apiErr != nil {
|
||||
writeS3Error(w, r, *apiErr, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
manifest, err := h.svc.HeadObject(bucket, key)
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
etag := manifest.ETag
|
||||
size := strconv.FormatInt(manifest.Size, 10)
|
||||
|
||||
w.Header().Set("ETag", `"`+etag+`"`)
|
||||
w.Header().Set("Content-Length", size)
|
||||
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
type limitedListener struct {
|
||||
net.Listener
|
||||
slots chan struct{}
|
||||
}
|
||||
|
||||
func newLimitedListener(inner net.Listener, maxConns int) net.Listener {
|
||||
if maxConns <= 0 {
|
||||
return inner
|
||||
}
|
||||
return &limitedListener{
|
||||
Listener: inner,
|
||||
slots: make(chan struct{}, maxConns),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *limitedListener) Accept() (net.Conn, error) {
|
||||
l.slots <- struct{}{}
|
||||
conn, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
<-l.slots
|
||||
return nil, err
|
||||
}
|
||||
return &limitedConn{
|
||||
Conn: conn,
|
||||
done: func() { <-l.slots },
|
||||
}, nil
|
||||
}
|
||||
|
||||
type limitedConn struct {
|
||||
net.Conn
|
||||
once sync.Once
|
||||
done func()
|
||||
}
|
||||
|
||||
func (c *limitedConn) Close() error {
|
||||
err := c.Conn.Close()
|
||||
c.once.Do(c.done)
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
|
||||
buckets, err := h.svc.ListBuckets()
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
response := models.ListAllMyBucketsResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Owner: models.BucketsOwner{
|
||||
ID: "local",
|
||||
DisplayName: "local",
|
||||
},
|
||||
Buckets: models.BucketsElement{
|
||||
Items: make([]models.BucketItem, 0, len(buckets)),
|
||||
},
|
||||
}
|
||||
|
||||
for _, bucket := range buckets {
|
||||
manifest, err := h.svc.GetBucketManifest(bucket)
|
||||
if err != nil {
|
||||
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
|
||||
continue
|
||||
}
|
||||
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
|
||||
Name: bucket,
|
||||
CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
})
|
||||
}
|
||||
|
||||
payload, err := xml.MarshalIndent(response, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(payload)
|
||||
}
|
||||
|
||||
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
|
||||
bucket := chi.URLParam(r, "bucket")
|
||||
|
||||
if r.URL.Query().Get("list-type") == "2" {
|
||||
h.handleListObjectsV2(w, r, bucket)
|
||||
return
|
||||
}
|
||||
if r.URL.Query().Has("location") {
|
||||
xmlResponse := `<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">us-east-1</LocationConstraint>`
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, err := w.Write([]byte(xmlResponse))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
|
||||
|
||||
}
|
||||
|
||||
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
|
||||
prefix := r.URL.Query().Get("prefix")
|
||||
delimiter := r.URL.Query().Get("delimiter")
|
||||
startAfter := r.URL.Query().Get("start-after")
|
||||
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
|
||||
if encodingType != "" && encodingType != "url" {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
maxKeys := 1000
|
||||
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
|
||||
parsed, err := strconv.Atoi(rawMaxKeys)
|
||||
if err != nil || parsed < 0 {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
if parsed > 1000 {
|
||||
parsed = 1000
|
||||
}
|
||||
maxKeys = parsed
|
||||
}
|
||||
|
||||
continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token"))
|
||||
continuationMarker := ""
|
||||
continuationType := ""
|
||||
continuationValue := ""
|
||||
if continuationToken != "" {
|
||||
decoded, err := base64.StdEncoding.DecodeString(continuationToken)
|
||||
if err != nil || len(decoded) == 0 {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
continuationMarker = string(decoded)
|
||||
continuationType, continuationValue, _ = strings.Cut(continuationMarker, ":")
|
||||
if (continuationType != "K" && continuationType != "C") || continuationValue == "" {
|
||||
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
result := models.ListBucketResultV2{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Name: bucket,
|
||||
Prefix: s3EncodeIfNeeded(prefix, encodingType),
|
||||
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
|
||||
MaxKeys: maxKeys,
|
||||
ContinuationToken: continuationToken,
|
||||
StartAfter: s3EncodeIfNeeded(startAfter, encodingType),
|
||||
EncodingType: encodingType,
|
||||
}
|
||||
|
||||
type pageEntry struct {
|
||||
Marker string
|
||||
Object *models.ObjectManifest
|
||||
CommonPrefix string
|
||||
}
|
||||
|
||||
entries := make([]pageEntry, 0, maxKeys)
|
||||
seenCommonPrefixes := make(map[string]struct{})
|
||||
truncated := false
|
||||
stopErr := errors.New("list_v2_page_complete")
|
||||
|
||||
startKey := prefix
|
||||
if continuationToken != "" {
|
||||
startKey = continuationValue
|
||||
} else if startAfter != "" && startAfter > startKey {
|
||||
startKey = startAfter
|
||||
}
|
||||
|
||||
if maxKeys > 0 {
|
||||
err := h.svc.ForEachObjectFrom(bucket, startKey, func(object *models.ObjectManifest) error {
|
||||
if object == nil {
|
||||
return nil
|
||||
}
|
||||
key := object.Key
|
||||
|
||||
if prefix != "" {
|
||||
if key < prefix {
|
||||
return nil
|
||||
}
|
||||
if !strings.HasPrefix(key, prefix) {
|
||||
return stopErr
|
||||
}
|
||||
}
|
||||
|
||||
if continuationToken != "" {
|
||||
if continuationType == "K" && key <= continuationValue {
|
||||
return nil
|
||||
}
|
||||
if continuationType == "C" && strings.HasPrefix(key, continuationValue) {
|
||||
return nil
|
||||
}
|
||||
} else if startAfter != "" && key <= startAfter {
|
||||
return nil
|
||||
}
|
||||
|
||||
if delimiter != "" {
|
||||
relative := strings.TrimPrefix(key, prefix)
|
||||
if idx := strings.Index(relative, delimiter); idx >= 0 {
|
||||
commonPrefix := prefix + relative[:idx+len(delimiter)]
|
||||
if continuationToken == "" && startAfter != "" && commonPrefix <= startAfter {
|
||||
return nil
|
||||
}
|
||||
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
|
||||
return nil
|
||||
}
|
||||
seenCommonPrefixes[commonPrefix] = struct{}{}
|
||||
if len(entries) >= maxKeys {
|
||||
truncated = true
|
||||
return stopErr
|
||||
}
|
||||
entries = append(entries, pageEntry{
|
||||
Marker: "C:" + commonPrefix,
|
||||
CommonPrefix: commonPrefix,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(entries) >= maxKeys {
|
||||
truncated = true
|
||||
return stopErr
|
||||
}
|
||||
entries = append(entries, pageEntry{
|
||||
Marker: "K:" + key,
|
||||
Object: object,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, stopErr) {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.Object != nil {
|
||||
result.Contents = append(result.Contents, models.Contents{
|
||||
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
|
||||
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
ETag: `"` + entry.Object.ETag + `"`,
|
||||
Size: entry.Object.Size,
|
||||
StorageClass: "STANDARD",
|
||||
})
|
||||
} else {
|
||||
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
|
||||
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
|
||||
})
|
||||
}
|
||||
result.KeyCount++
|
||||
}
|
||||
|
||||
result.IsTruncated = truncated
|
||||
if result.IsTruncated && result.KeyCount > 0 {
|
||||
result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[result.KeyCount-1].Marker))
|
||||
}
|
||||
|
||||
xmlResponse, err := xml.MarshalIndent(result, "", " ")
|
||||
if err != nil {
|
||||
writeMappedS3Error(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(xmlResponse)
|
||||
|
||||
}
|
||||
|
||||
func s3EncodeIfNeeded(value, encodingType string) string {
|
||||
if encodingType != "url" || value == "" {
|
||||
return value
|
||||
}
|
||||
encoded := url.QueryEscape(value)
|
||||
return strings.ReplaceAll(encoded, "+", "%20")
|
||||
}
|
||||
|
||||
func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) {
|
||||
if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes="))
|
||||
if spec == "" || strings.Contains(spec, ",") {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
|
||||
parts := strings.SplitN(spec, "-", 2)
|
||||
if len(parts) != 2 {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
|
||||
if parts[0] == "" {
|
||||
suffixLength, err := strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil || suffixLength <= 0 {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
if suffixLength > size {
|
||||
suffixLength = size
|
||||
}
|
||||
start := size - suffixLength
|
||||
end := size - 1
|
||||
return start, end, nil
|
||||
}
|
||||
|
||||
start, err := strconv.ParseInt(parts[0], 10, 64)
|
||||
if err != nil || start < 0 || start >= size {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
|
||||
var end int64
|
||||
if parts[1] == "" {
|
||||
end = size - 1
|
||||
} else {
|
||||
end, err = strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil || end < start {
|
||||
return 0, 0, errors.New("invalid range")
|
||||
}
|
||||
if end >= size {
|
||||
end = size - 1
|
||||
}
|
||||
}
|
||||
|
||||
return start, end, nil
|
||||
}
|
||||
|
||||
func (h *Handler) Start(ctx context.Context, address string) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
h.logger.Info("server_starting",
|
||||
"address", address,
|
||||
"log_format", h.logConfig.Format,
|
||||
"log_level", h.logConfig.LevelName,
|
||||
"audit_log", h.logConfig.Audit,
|
||||
)
|
||||
h.setupRoutes()
|
||||
|
||||
server := http.Server{
|
||||
Addr: address,
|
||||
Handler: h.router,
|
||||
ReadHeaderTimeout: serverReadHeaderTimeout,
|
||||
ReadTimeout: serverReadTimeout,
|
||||
WriteTimeout: serverWriteTimeout,
|
||||
IdleTimeout: serverIdleTimeout,
|
||||
MaxHeaderBytes: serverMaxHeaderBytes,
|
||||
}
|
||||
errCh := make(chan error, 1)
|
||||
listener, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
limitedListener := newLimitedListener(listener, serverMaxConnections)
|
||||
|
||||
go func() {
|
||||
if err := server.Serve(limitedListener); err != nil {
|
||||
if !errors.Is(err, http.ErrServerClosed) {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
|
||||
case err := <-errCh:
|
||||
h.logger.Error("server_listen_failed", "error", err)
|
||||
if closeErr := h.svc.Close(); closeErr != nil {
|
||||
h.logger.Error("service_close_failed", "error", closeErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
h.logger.Error("server_shutdown_failed", "error", err)
|
||||
return err
|
||||
}
|
||||
if err := h.svc.Close(); err != nil {
|
||||
h.logger.Error("service_close_failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
h.logger.Info("server_stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
173
api/s3_errors.go
Normal file
173
api/s3_errors.go
Normal file
@@ -0,0 +1,173 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fs/metadata"
|
||||
"fs/models"
|
||||
"fs/service"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
)
|
||||
|
||||
type s3APIError struct {
|
||||
Status int
|
||||
Code string
|
||||
Message string
|
||||
}
|
||||
|
||||
var (
|
||||
s3ErrInvalidObjectKey = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidArgument",
|
||||
Message: "Object key is required.",
|
||||
}
|
||||
s3ErrKeyTooLong = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "KeyTooLongError",
|
||||
Message: "Your key is too long.",
|
||||
}
|
||||
s3ErrNotImplemented = s3APIError{
|
||||
Status: http.StatusNotImplemented,
|
||||
Code: "NotImplemented",
|
||||
Message: "A header you provided implies functionality that is not implemented.",
|
||||
}
|
||||
s3ErrInvalidPart = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidPart",
|
||||
Message: "One or more of the specified parts could not be found.",
|
||||
}
|
||||
s3ErrInvalidPartOrder = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidPartOrder",
|
||||
Message: "The list of parts was not in ascending order.",
|
||||
}
|
||||
s3ErrMalformedXML = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "MalformedXML",
|
||||
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
|
||||
}
|
||||
s3ErrInvalidArgument = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidArgument",
|
||||
Message: "Invalid argument.",
|
||||
}
|
||||
s3ErrInvalidRange = s3APIError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "InvalidRange",
|
||||
Message: "The requested range is not satisfiable.",
|
||||
}
|
||||
s3ErrEntityTooSmall = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "EntityTooSmall",
|
||||
Message: "Your proposed upload is smaller than the minimum allowed object size.",
|
||||
}
|
||||
s3ErrEntityTooLarge = s3APIError{
|
||||
Status: http.StatusRequestEntityTooLarge,
|
||||
Code: "EntityTooLarge",
|
||||
Message: "Your proposed upload exceeds the maximum allowed size.",
|
||||
}
|
||||
s3ErrTooManyDeleteObjects = s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "MalformedXML",
|
||||
Message: "The request must contain no more than 1000 object identifiers.",
|
||||
}
|
||||
s3ErrInternal = s3APIError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Code: "InternalError",
|
||||
Message: "We encountered an internal error. Please try again.",
|
||||
}
|
||||
)
|
||||
|
||||
func mapToS3Error(err error) s3APIError {
|
||||
switch {
|
||||
case errors.Is(err, metadata.ErrInvalidBucketName):
|
||||
return s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidBucketName",
|
||||
Message: "The specified bucket is not valid.",
|
||||
}
|
||||
case errors.Is(err, metadata.ErrBucketAlreadyExists):
|
||||
return s3APIError{
|
||||
Status: http.StatusConflict,
|
||||
Code: "BucketAlreadyOwnedByYou",
|
||||
Message: "Your previous request to create the named bucket succeeded and you already own it.",
|
||||
}
|
||||
case errors.Is(err, metadata.ErrBucketNotFound):
|
||||
return s3APIError{
|
||||
Status: http.StatusNotFound,
|
||||
Code: "NoSuchBucket",
|
||||
Message: "The specified bucket does not exist.",
|
||||
}
|
||||
case errors.Is(err, metadata.ErrBucketNotEmpty):
|
||||
return s3APIError{
|
||||
Status: http.StatusConflict,
|
||||
Code: "BucketNotEmpty",
|
||||
Message: "The bucket you tried to delete is not empty.",
|
||||
}
|
||||
case errors.Is(err, metadata.ErrObjectNotFound):
|
||||
return s3APIError{
|
||||
Status: http.StatusNotFound,
|
||||
Code: "NoSuchKey",
|
||||
Message: "The specified key does not exist.",
|
||||
}
|
||||
case errors.Is(err, metadata.ErrMultipartNotFound):
|
||||
return s3APIError{
|
||||
Status: http.StatusNotFound,
|
||||
Code: "NoSuchUpload",
|
||||
Message: "The specified multipart upload does not exist.",
|
||||
}
|
||||
case errors.Is(err, metadata.ErrMultipartNotPending):
|
||||
return s3APIError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "InvalidRequest",
|
||||
Message: "The multipart upload is not in a valid state for this operation.",
|
||||
}
|
||||
case errors.Is(err, service.ErrInvalidPart):
|
||||
return s3ErrInvalidPart
|
||||
case errors.Is(err, service.ErrInvalidPartOrder):
|
||||
return s3ErrInvalidPartOrder
|
||||
case errors.Is(err, service.ErrInvalidCompleteRequest):
|
||||
return s3ErrMalformedXML
|
||||
case errors.Is(err, service.ErrEntityTooSmall):
|
||||
return s3ErrEntityTooSmall
|
||||
default:
|
||||
return s3ErrInternal
|
||||
}
|
||||
}
|
||||
|
||||
func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) {
|
||||
requestID := ""
|
||||
if r != nil {
|
||||
requestID = middleware.GetReqID(r.Context())
|
||||
if requestID != "" {
|
||||
w.Header().Set("x-amz-request-id", requestID)
|
||||
}
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(apiErr.Status)
|
||||
|
||||
if r != nil && r.Method == http.MethodHead {
|
||||
return
|
||||
}
|
||||
|
||||
payload := models.S3ErrorResponse{
|
||||
Code: apiErr.Code,
|
||||
Message: apiErr.Message,
|
||||
Resource: resource,
|
||||
RequestID: requestID,
|
||||
}
|
||||
|
||||
out, err := xml.MarshalIndent(payload, "", " ")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, _ = w.Write([]byte(xml.Header))
|
||||
_, _ = w.Write(out)
|
||||
}
|
||||
|
||||
func writeMappedS3Error(w http.ResponseWriter, r *http.Request, err error) {
|
||||
writeS3Error(w, r, mapToS3Error(err), r.URL.Path)
|
||||
}
|
||||
10
go.mod
10
go.mod
@@ -3,8 +3,12 @@ module fs
|
||||
go 1.25.7
|
||||
|
||||
require (
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
github.com/klauspost/reedsolomon v1.13.2 // indirect
|
||||
go.etcd.io/bbolt v1.4.3 // indirect
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/google/uuid v1.6.0
|
||||
go.etcd.io/bbolt v1.4.3
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/joho/godotenv v1.5.1 // indirect
|
||||
golang.org/x/sys v0.41.0 // indirect
|
||||
)
|
||||
|
||||
22
go.sum
22
go.sum
@@ -1,10 +1,20 @@
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE=
|
||||
github.com/klauspost/reedsolomon v1.13.2/go.mod h1:ggJT9lc71Vu+cSOPBlxGvBN6TfAS77qB4fp8vJ05NSA=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
|
||||
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
|
||||
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
|
||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
|
||||
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
157
logging/logging.go
Normal file
157
logging/logging.go
Normal file
@@ -0,0 +1,157 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Level slog.Level
|
||||
LevelName string
|
||||
Format string
|
||||
Audit bool
|
||||
AddSource bool
|
||||
DebugMode bool
|
||||
}
|
||||
|
||||
func ConfigFromEnv() Config {
|
||||
levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL")))
|
||||
format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT")))
|
||||
return ConfigFromValues(levelName, format, envBool("AUDIT_LOG", true))
|
||||
}
|
||||
|
||||
func ConfigFromValues(levelName, format string, audit bool) Config {
|
||||
levelName = strings.ToLower(strings.TrimSpace(levelName))
|
||||
if levelName == "" {
|
||||
levelName = "info"
|
||||
}
|
||||
level := parseLevel(levelName)
|
||||
levelName = strings.ToUpper(level.String())
|
||||
|
||||
format = strings.ToLower(strings.TrimSpace(format))
|
||||
if format == "" {
|
||||
format = "text"
|
||||
}
|
||||
if format != "json" && format != "text" {
|
||||
format = "text"
|
||||
}
|
||||
|
||||
debugMode := level <= slog.LevelDebug
|
||||
return Config{
|
||||
Level: level,
|
||||
LevelName: levelName,
|
||||
Format: format,
|
||||
Audit: audit,
|
||||
AddSource: debugMode,
|
||||
DebugMode: debugMode,
|
||||
}
|
||||
}
|
||||
|
||||
func NewLogger(cfg Config) *slog.Logger {
|
||||
opts := &slog.HandlerOptions{
|
||||
Level: cfg.Level,
|
||||
AddSource: cfg.AddSource,
|
||||
}
|
||||
opts.ReplaceAttr = func(_ []string, attr slog.Attr) slog.Attr {
|
||||
if attr.Key == slog.SourceKey {
|
||||
if src, ok := attr.Value.Any().(*slog.Source); ok && src != nil {
|
||||
attr.Key = "src"
|
||||
attr.Value = slog.StringValue(filepath.Base(src.File) + ":" + strconv.Itoa(src.Line))
|
||||
}
|
||||
}
|
||||
return attr
|
||||
}
|
||||
|
||||
var handler slog.Handler
|
||||
if cfg.Format == "json" {
|
||||
handler = slog.NewJSONHandler(os.Stdout, opts)
|
||||
} else {
|
||||
handler = slog.NewTextHandler(os.Stdout, opts)
|
||||
}
|
||||
|
||||
logger := slog.New(handler)
|
||||
slog.SetDefault(logger)
|
||||
return logger
|
||||
}
|
||||
|
||||
func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
|
||||
requestID := middleware.GetReqID(r.Context())
|
||||
if requestID != "" {
|
||||
ww.Header().Set("x-amz-request-id", requestID)
|
||||
}
|
||||
|
||||
next.ServeHTTP(ww, r)
|
||||
|
||||
if !cfg.Audit && !cfg.DebugMode {
|
||||
return
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
status := ww.Status()
|
||||
if status == 0 {
|
||||
status = http.StatusOK
|
||||
}
|
||||
attrs := []any{
|
||||
"method", r.Method,
|
||||
"path", r.URL.Path,
|
||||
"status", status,
|
||||
"bytes", ww.BytesWritten(),
|
||||
"duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0,
|
||||
"remote_addr", r.RemoteAddr,
|
||||
}
|
||||
if requestID != "" {
|
||||
attrs = append(attrs, "request_id", requestID)
|
||||
}
|
||||
|
||||
if cfg.DebugMode {
|
||||
attrs = append(attrs,
|
||||
"query", r.URL.RawQuery,
|
||||
"user_agent", r.UserAgent(),
|
||||
"content_length", r.ContentLength,
|
||||
"content_type", r.Header.Get("Content-Type"),
|
||||
"x_amz_sha256", r.Header.Get("x-amz-content-sha256"),
|
||||
)
|
||||
logger.Debug("http_request", attrs...)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("http_request", attrs...)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func envBool(key string, defaultValue bool) bool {
|
||||
raw := os.Getenv(key)
|
||||
if raw == "" {
|
||||
return defaultValue
|
||||
}
|
||||
value, err := strconv.ParseBool(raw)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func parseLevel(levelName string) slog.Level {
|
||||
switch levelName {
|
||||
case "debug":
|
||||
return slog.LevelDebug
|
||||
case "warn", "warning":
|
||||
return slog.LevelWarn
|
||||
case "error":
|
||||
return slog.LevelError
|
||||
default:
|
||||
return slog.LevelInfo
|
||||
}
|
||||
}
|
||||
78
main.go
78
main.go
@@ -1,54 +1,64 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"context"
|
||||
"fs/api"
|
||||
"fs/logging"
|
||||
"fs/metadata"
|
||||
"fs/service"
|
||||
"io"
|
||||
"fs/storage"
|
||||
"fs/utils"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("Hello, World!")
|
||||
imageStream, err := os.Open("fer.jpg")
|
||||
if err != nil {
|
||||
fmt.Printf("Error opening image stream: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer imageStream.Close()
|
||||
config := utils.NewConfig()
|
||||
logConfig := logging.ConfigFromValues(config.LogLevel, config.LogFormat, config.AuditLog)
|
||||
logger := logging.NewLogger(logConfig)
|
||||
logger.Info("boot",
|
||||
"log_level", logConfig.LevelName,
|
||||
"log_format", logConfig.Format,
|
||||
"audit_log", logConfig.Audit,
|
||||
"data_path", config.DataPath,
|
||||
"multipart_retention_hours", int(config.MultipartCleanupRetention/time.Hour),
|
||||
)
|
||||
|
||||
metadataHandler, err := metadata.NewMetadataHandler("metadata.db")
|
||||
if err != nil {
|
||||
fmt.Printf("Error initializing metadata handler: %v\n", err)
|
||||
if err := os.MkdirAll(config.DataPath, 0o755); err != nil {
|
||||
logger.Error("failed_to_prepare_data_path", "path", config.DataPath, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
objectService := service.NewObjectService(metadataHandler)
|
||||
dbPath := filepath.Join(config.DataPath, "metadata.db")
|
||||
metadataHandler, err := metadata.NewMetadataHandler(dbPath)
|
||||
if err != nil {
|
||||
logger.Error("failed_to_initialize_metadata_handler", "error", err)
|
||||
return
|
||||
}
|
||||
blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize)
|
||||
if err != nil {
|
||||
_ = metadataHandler.Close()
|
||||
logger.Error("failed_to_initialize_blob_store", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream)
|
||||
if err != nil {
|
||||
fmt.Printf("Error ingesting stream: %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Manifest: %+v\n", manifest)
|
||||
objectService := service.NewObjectService(metadataHandler, blobHandler, config.MultipartCleanupRetention)
|
||||
handler := api.NewHandler(objectService, logger, logConfig)
|
||||
addr := config.Address + ":" + strconv.Itoa(config.Port)
|
||||
|
||||
objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg")
|
||||
if err != nil {
|
||||
fmt.Printf("Error retrieving object: %v\n", err)
|
||||
return
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
if config.GcEnabled {
|
||||
go objectService.RunGC(ctx, config.GcInterval)
|
||||
}
|
||||
fmt.Printf("Retrieved manifest: %+v\n", manifest2)
|
||||
recoveredFile, err := os.Create("recovered_" + manifest2.Key)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating file: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer recoveredFile.Close()
|
||||
|
||||
bytesWritten, err := io.Copy(recoveredFile, objectData)
|
||||
if err != nil {
|
||||
fmt.Printf("Error streaming to recovered file: %v\n", err)
|
||||
if err = handler.Start(ctx, addr); err != nil {
|
||||
logger.Error("server_stopped_with_error", "error", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten)
|
||||
|
||||
}
|
||||
|
||||
@@ -2,37 +2,246 @@ package metadata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"fs/models"
|
||||
"net"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const ManifestBucketName = "object_manifests"
|
||||
|
||||
type MetadataHandler struct {
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
var systemIndex = []byte("__SYSTEM_BUCKETS__")
|
||||
var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__")
|
||||
var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__")
|
||||
|
||||
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]+$`)
|
||||
|
||||
var (
|
||||
ErrInvalidBucketName = errors.New("invalid bucket name")
|
||||
ErrBucketAlreadyExists = errors.New("bucket already exists")
|
||||
ErrBucketNotFound = errors.New("bucket not found")
|
||||
ErrBucketNotEmpty = errors.New("bucket not empty")
|
||||
ErrObjectNotFound = errors.New("object not found")
|
||||
ErrMultipartNotFound = errors.New("multipart upload not found")
|
||||
ErrMultipartNotPending = errors.New("multipart upload is not pending")
|
||||
)
|
||||
|
||||
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
||||
db, err := bbolt.Open(dbPath, 0600, nil)
|
||||
db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 2 * time.Second})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MetadataHandler{db: db}, nil
|
||||
h := &MetadataHandler{db: db}
|
||||
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(systemIndex)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||
func isValidBucketName(bucketName string) bool {
|
||||
if len(bucketName) < 3 || len(bucketName) > 63 {
|
||||
return false
|
||||
}
|
||||
if !validBucketName.MatchString(bucketName) {
|
||||
return false
|
||||
}
|
||||
if strings.Contains(bucketName, "..") {
|
||||
return false
|
||||
}
|
||||
if bucketName[0] == '.' || bucketName[0] == '-' || bucketName[len(bucketName)-1] == '.' || bucketName[len(bucketName)-1] == '-' {
|
||||
return false
|
||||
}
|
||||
for _, label := range strings.Split(bucketName, ".") {
|
||||
if label == "" || label[0] == '-' || label[len(label)-1] == '-' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if ip := net.ParseIP(bucketName); ip != nil && ip.To4() != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) Close() error {
|
||||
return h.db.Close()
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) CreateBucket(bucketName string) error {
|
||||
if !isValidBucketName(bucketName) {
|
||||
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName))
|
||||
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key)
|
||||
if indexBucket.Get([]byte(bucketName)) != nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketAlreadyExists, bucketName)
|
||||
}
|
||||
|
||||
_, err = tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
manifest := models.BucketManifest{
|
||||
Name: bucketName,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
data, _ := json.Marshal(manifest)
|
||||
|
||||
return indexBucket.Put([]byte(bucketName), data)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) DeleteBucket(bucketName string) error {
|
||||
if !isValidBucketName(bucketName) {
|
||||
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if indexBucket.Get([]byte(bucketName)) == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
|
||||
}
|
||||
metadataBucket := tx.Bucket([]byte(bucketName))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
|
||||
}
|
||||
if k, _ := metadataBucket.Cursor().First(); k != nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
|
||||
}
|
||||
|
||||
multipartUploadsBucket, err := getMultipartUploadBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cursor := multipartUploadsBucket.Cursor()
|
||||
for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() {
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(payload, &upload); err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.Bucket == bucketName && upload.State == "pending" {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
|
||||
}
|
||||
if err := indexBucket.Delete([]byte(bucketName)); err != nil {
|
||||
return fmt.Errorf("error deleting bucket %s from system index: %w", bucketName, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) ListBuckets() ([]string, error) {
|
||||
buckets := []string{}
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
c := systemIndexBucket.Cursor()
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
buckets = append(buckets, string(k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
|
||||
var manifest *models.BucketManifest
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
data := systemIndexBucket.Get([]byte(bucketName))
|
||||
if data == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
|
||||
}
|
||||
err := json.Unmarshal(data, &manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||
bucket := manifest.Bucket
|
||||
key := manifest.Key
|
||||
|
||||
if _, err := h.GetBucketManifest(bucket); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
data, err := json.Marshal(manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
return metadataBucket.Put([]byte(key), data)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -44,16 +253,15 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
||||
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
||||
var manifest *models.ObjectManifest
|
||||
|
||||
h.db.View(func(tx *bbolt.Tx) error {
|
||||
metadataBucket := tx.Bucket([]byte(ManifestBucketName))
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("bucket %s not found", ManifestBucketName)
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s", bucket, key)
|
||||
data := metadataBucket.Get([]byte(key))
|
||||
if data == nil {
|
||||
|
||||
return fmt.Errorf("manifest not found for bucket %s and key %s", bucket, key)
|
||||
return fmt.Errorf("%w: %s/%s", ErrObjectNotFound, bucket, key)
|
||||
}
|
||||
err := json.Unmarshal(data, &manifest)
|
||||
if err != nil {
|
||||
@@ -61,6 +269,547 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||
|
||||
var objects []*models.ObjectManifest
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
if systemIndexBucket.Get([]byte(bucket)) == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
_bucket := tx.Bucket([]byte(bucket))
|
||||
if _bucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
err := _bucket.ForEach(func(k, v []byte) error {
|
||||
if prefix != "" && !strings.HasPrefix(string(k), prefix) {
|
||||
return nil
|
||||
}
|
||||
object := models.ObjectManifest{}
|
||||
err := json.Unmarshal(v, &object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objects = append(objects, &object)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
|
||||
if fn == nil {
|
||||
return errors.New("object callback is required")
|
||||
}
|
||||
|
||||
return h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
if systemIndexBucket.Get([]byte(bucket)) == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
|
||||
cursor := metadataBucket.Cursor()
|
||||
var k, v []byte
|
||||
if startKey == "" {
|
||||
k, v = cursor.First()
|
||||
} else {
|
||||
k, v = cursor.Seek([]byte(startKey))
|
||||
}
|
||||
|
||||
for ; k != nil; k, v = cursor.Next() {
|
||||
object := models.ObjectManifest{}
|
||||
if err := json.Unmarshal(v, &object); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fn(&object); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
|
||||
if _, err := h.GetManifest(bucket, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
return metadataBucket.Delete([]byte(key))
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
|
||||
deleted := make([]string, 0, len(keys))
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
metadataBucket := tx.Bucket([]byte(bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
if metadataBucket.Get([]byte(key)) != nil {
|
||||
if err := metadataBucket.Delete([]byte(key)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
deleted = append(deleted, key)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||
var upload *models.MultipartUpload
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
if systemIndexBucket.Get([]byte(bucket)) != nil {
|
||||
return nil
|
||||
}
|
||||
return ErrBucketNotFound
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
uploadId := uuid.New().String()
|
||||
createdAt := time.Now().UTC().Format(time.RFC3339)
|
||||
upload = &models.MultipartUpload{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
UploadID: uploadId,
|
||||
CreatedAt: createdAt,
|
||||
State: "pending",
|
||||
}
|
||||
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
|
||||
if multipartUploadBucket == nil {
|
||||
return errors.New("multipart upload index not found")
|
||||
}
|
||||
payload, err := json.Marshal(upload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = multipartUploadBucket.Put([]byte(uploadId), payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return upload, nil
|
||||
}
|
||||
|
||||
func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
|
||||
multipartUploadBucket := tx.Bucket(multipartUploadIndex)
|
||||
if multipartUploadBucket == nil {
|
||||
return nil, errors.New("multipart upload index not found")
|
||||
}
|
||||
return multipartUploadBucket, nil
|
||||
}
|
||||
|
||||
func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
|
||||
multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex)
|
||||
if multipartPartsBucket == nil {
|
||||
return nil, errors.New("multipart upload parts index not found")
|
||||
}
|
||||
return multipartPartsBucket, nil
|
||||
}
|
||||
|
||||
func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) {
|
||||
payload := multipartUploadBucket.Get([]byte(uploadID))
|
||||
if payload == nil {
|
||||
return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID)
|
||||
}
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(payload, &upload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &upload, nil
|
||||
}
|
||||
|
||||
func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) {
|
||||
multipartUploadBucket, err := getMultipartUploadBucket(tx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return upload, multipartUploadBucket, nil
|
||||
}
|
||||
|
||||
func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error {
|
||||
payload, err := json.Marshal(upload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return multipartUploadBucket.Put([]byte(uploadID), payload)
|
||||
}
|
||||
|
||||
func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
|
||||
multipartPartsBucket, err := getMultipartPartsBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prefix := uploadID + ":"
|
||||
cursor := multipartPartsBucket.Cursor()
|
||||
keysToDelete := make([][]byte, 0)
|
||||
for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() {
|
||||
keyCopy := make([]byte, len(k))
|
||||
copy(keyCopy, k)
|
||||
keysToDelete = append(keysToDelete, keyCopy)
|
||||
}
|
||||
for _, key := range keysToDelete {
|
||||
if err := multipartPartsBucket.Delete(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
|
||||
var upload *models.MultipartUpload
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return upload, nil
|
||||
}
|
||||
func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error {
|
||||
if part.PartNumber < 1 || part.PartNumber > 10000 {
|
||||
return fmt.Errorf("invalid part number: %d", part.PartNumber)
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State != "pending" {
|
||||
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
|
||||
}
|
||||
|
||||
multipartPartsBucket, err := getMultipartPartsBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber)
|
||||
payload, err := json.Marshal(part)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return multipartPartsBucket.Put([]byte(key), payload)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
|
||||
parts := make([]models.UploadedPart, 0)
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
multipartPartsBucket, err := getMultipartPartsBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prefix := uploadID + ":"
|
||||
cursor := multipartPartsBucket.Cursor()
|
||||
for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() {
|
||||
part := models.UploadedPart{}
|
||||
if err := json.Unmarshal(v, &part); err != nil {
|
||||
return err
|
||||
}
|
||||
parts = append(parts, part)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Slice(parts, func(i, j int) bool {
|
||||
return parts[i].PartNumber < parts[j].PartNumber
|
||||
})
|
||||
return parts, nil
|
||||
}
|
||||
func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error {
|
||||
if final == nil {
|
||||
return errors.New("final object manifest is required")
|
||||
}
|
||||
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State != "pending" {
|
||||
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
|
||||
}
|
||||
|
||||
metadataBucket := tx.Bucket([]byte(upload.Bucket))
|
||||
if metadataBucket == nil {
|
||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket)
|
||||
}
|
||||
final.Bucket = upload.Bucket
|
||||
final.Key = upload.Key
|
||||
finalPayload, err := json.Marshal(final)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
upload.State = "completed"
|
||||
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State == "completed" {
|
||||
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
|
||||
}
|
||||
upload.State = "aborted"
|
||||
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int, error) {
|
||||
if retention <= 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
cleaned := 0
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
uploadsBucket, err := getMultipartUploadBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
keysToDelete := make([]string, 0)
|
||||
if err := uploadsBucket.ForEach(func(k, v []byte) error {
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(v, &upload); err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State == "pending" {
|
||||
return nil
|
||||
}
|
||||
createdAt, err := time.Parse(time.RFC3339, upload.CreatedAt)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if now.Sub(createdAt) >= retention {
|
||||
keysToDelete = append(keysToDelete, string(k))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, uploadID := range keysToDelete {
|
||||
if err := uploadsBucket.Delete([]byte(uploadID)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
|
||||
return err
|
||||
}
|
||||
cleaned++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return cleaned, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
|
||||
chunkSet := make(map[string]struct{})
|
||||
pendingUploadSet := make(map[string]struct{})
|
||||
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||
if systemIndexBucket == nil {
|
||||
return errors.New("system index not found")
|
||||
}
|
||||
c := systemIndexBucket.Cursor()
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
metadataBucket := tx.Bucket(k)
|
||||
if metadataBucket == nil {
|
||||
continue
|
||||
}
|
||||
err := metadataBucket.ForEach(func(k, v []byte) error {
|
||||
object := models.ObjectManifest{}
|
||||
err := json.Unmarshal(v, &object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, chunkID := range object.Chunks {
|
||||
chunkSet[chunkID] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
uploadsBucket := tx.Bucket(multipartUploadIndex)
|
||||
if uploadsBucket == nil {
|
||||
return errors.New("multipart upload index not found")
|
||||
}
|
||||
if err := uploadsBucket.ForEach(func(k, v []byte) error {
|
||||
upload := models.MultipartUpload{}
|
||||
if err := json.Unmarshal(v, &upload); err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.State == "pending" {
|
||||
pendingUploadSet[string(k)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partsBucket := tx.Bucket(multipartUploadPartsIndex)
|
||||
if partsBucket == nil {
|
||||
return errors.New("multipart upload parts index not found")
|
||||
}
|
||||
if err := partsBucket.ForEach(func(k, v []byte) error {
|
||||
uploadID, _, ok := strings.Cut(string(k), ":")
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if _, pending := pendingUploadSet[uploadID]; !pending {
|
||||
return nil
|
||||
}
|
||||
|
||||
part := models.UploadedPart{}
|
||||
if err := json.Unmarshal(v, &part); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, chunkID := range part.Chunks {
|
||||
chunkSet[chunkID] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return chunkSet, nil
|
||||
}
|
||||
|
||||
func (h *MetadataHandler) GetReferencedChunks() ([]string, error) {
|
||||
chunkSet, err := h.GetReferencedChunkSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chunks := make([]string, 0, len(chunkSet))
|
||||
for chunkID := range chunkSet {
|
||||
chunks = append(chunks, chunkID)
|
||||
}
|
||||
return chunks, nil
|
||||
|
||||
}
|
||||
|
||||
174
models/models.go
174
models/models.go
@@ -1,5 +1,10 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ObjectManifest struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Key string `json:"key"`
|
||||
@@ -9,3 +14,172 @@ type ObjectManifest struct {
|
||||
Chunks []string `json:"chunks"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
}
|
||||
type BucketManifest struct {
|
||||
Name string `json:"name"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
OwnerID string `json:"owner_id"`
|
||||
OwnerDisplayName string `json:"owner_display_name"`
|
||||
Region string `json:"region"`
|
||||
VersioningStatus string `json:"versioning_status"`
|
||||
PublicAccessBlock bool `json:"public_access_block"`
|
||||
}
|
||||
|
||||
type ListAllMyBucketsResult struct {
|
||||
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Owner BucketsOwner `xml:"Owner"`
|
||||
Buckets BucketsElement `xml:"Buckets"`
|
||||
}
|
||||
|
||||
type BucketsOwner struct {
|
||||
ID string `xml:"ID"`
|
||||
DisplayName string `xml:"DisplayName,omitempty"`
|
||||
}
|
||||
|
||||
type BucketsElement struct {
|
||||
Items []BucketItem `xml:"Bucket"`
|
||||
}
|
||||
|
||||
type BucketItem struct {
|
||||
Name string `xml:"Name"`
|
||||
CreationDate string `xml:"CreationDate"`
|
||||
}
|
||||
|
||||
type S3ErrorResponse struct {
|
||||
XMLName xml.Name `xml:"Error"`
|
||||
Code string `xml:"Code"`
|
||||
Message string `xml:"Message"`
|
||||
Resource string `xml:"Resource,omitempty"`
|
||||
RequestID string `xml:"RequestId,omitempty"`
|
||||
HostID string `xml:"HostId,omitempty"`
|
||||
}
|
||||
|
||||
type ListBucketResult struct {
|
||||
XMLName xml.Name `xml:"ListBucketResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
|
||||
Name string `xml:"Name"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
KeyCount int `xml:"KeyCount"`
|
||||
MaxKeys int `xml:"MaxKeys"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
|
||||
Contents []Contents `xml:"Contents"`
|
||||
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
|
||||
}
|
||||
|
||||
type ListBucketResultV2 struct {
|
||||
XMLName xml.Name `xml:"ListBucketResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
|
||||
Name string `xml:"Name"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
Delimiter string `xml:"Delimiter,omitempty"`
|
||||
MaxKeys int `xml:"MaxKeys"`
|
||||
KeyCount int `xml:"KeyCount"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
ContinuationToken string `xml:"ContinuationToken,omitempty"`
|
||||
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
|
||||
StartAfter string `xml:"StartAfter,omitempty"`
|
||||
EncodingType string `xml:"EncodingType,omitempty"`
|
||||
|
||||
Contents []Contents `xml:"Contents,omitempty"`
|
||||
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
|
||||
}
|
||||
|
||||
type Contents struct {
|
||||
Key string `xml:"Key"`
|
||||
LastModified string `xml:"LastModified"`
|
||||
ETag string `xml:"ETag"`
|
||||
Size int64 `xml:"Size"`
|
||||
StorageClass string `xml:"StorageClass"`
|
||||
}
|
||||
|
||||
type CommonPrefixes struct {
|
||||
Prefix string `xml:"Prefix"`
|
||||
}
|
||||
|
||||
type MultipartUpload struct {
|
||||
UploadID string `json:"upload_id" xml:"UploadId"`
|
||||
Bucket string `json:"bucket" xml:"Bucket"`
|
||||
Key string `json:"key" xml:"Key"`
|
||||
CreatedAt string `json:"created_at" xml:"CreatedAt"`
|
||||
State string `json:"state" xml:"State"`
|
||||
}
|
||||
|
||||
type InitiateMultipartUploadResult struct {
|
||||
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
type UploadedPart struct {
|
||||
PartNumber int `json:"part_number" xml:"PartNumber"`
|
||||
ETag string `json:"etag" xml:"ETag"`
|
||||
Size int64 `json:"size" xml:"Size"`
|
||||
Chunks []string `json:"chunks"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
}
|
||||
|
||||
type CompletedPart struct {
|
||||
PartNumber int `xml:"PartNumber"`
|
||||
ETag string `xml:"ETag"`
|
||||
}
|
||||
|
||||
type CompleteMultipartUploadRequest struct {
|
||||
XMLName xml.Name `xml:"CompleteMultipartUpload"`
|
||||
Parts []CompletedPart `xml:"Part"`
|
||||
}
|
||||
|
||||
type CompleteMultipartUploadResult struct {
|
||||
XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
ETag string `xml:"ETag"`
|
||||
Location string `xml:"Location,omitempty"`
|
||||
}
|
||||
|
||||
type ListPartsResult struct {
|
||||
XMLName xml.Name `xml:"ListPartsResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
Parts []PartItem `xml:"Part"`
|
||||
}
|
||||
|
||||
type PartItem struct {
|
||||
PartNumber int `xml:"PartNumber"`
|
||||
LastModified string `xml:"LastModified"`
|
||||
ETag string `xml:"ETag"`
|
||||
Size int64 `xml:"Size"`
|
||||
}
|
||||
|
||||
type DeleteObjectsRequest struct {
|
||||
XMLName xml.Name `xml:"Delete"`
|
||||
Objects []DeleteObjectIdentity `xml:"Object"`
|
||||
Quiet bool `xml:"Quiet"`
|
||||
}
|
||||
|
||||
type DeleteObjectIdentity struct {
|
||||
Key string `xml:"Key"`
|
||||
}
|
||||
|
||||
type DeleteObjectsResult struct {
|
||||
XMLName xml.Name `xml:"DeleteResult"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
|
||||
Errors []DeleteError `xml:"Error,omitempty"`
|
||||
}
|
||||
|
||||
type DeletedEntry struct {
|
||||
Key string `xml:"Key"`
|
||||
}
|
||||
|
||||
type DeleteError struct {
|
||||
Key string `xml:"Key"`
|
||||
Code string `xml:"Code"`
|
||||
Message string `xml:"Message"`
|
||||
}
|
||||
|
||||
@@ -1,29 +1,51 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"fs/metadata"
|
||||
"fs/models"
|
||||
"fs/storage"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ObjectService struct {
|
||||
metadataHandler *metadata.MetadataHandler
|
||||
metadata *metadata.MetadataHandler
|
||||
blob *storage.BlobStore
|
||||
multipartRetention time.Duration
|
||||
gcMu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService {
|
||||
return &ObjectService{metadataHandler: metadataHandler}
|
||||
var (
|
||||
ErrInvalidPart = errors.New("invalid multipart part")
|
||||
ErrInvalidPartOrder = errors.New("invalid multipart part order")
|
||||
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
|
||||
ErrEntityTooSmall = errors.New("multipart entity too small")
|
||||
)
|
||||
|
||||
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration) *ObjectService {
|
||||
if multipartRetention <= 0 {
|
||||
multipartRetention = 24 * time.Hour
|
||||
}
|
||||
return &ObjectService{
|
||||
metadata: metadataHandler,
|
||||
blob: blobHandler,
|
||||
multipartRetention: multipartRetention,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
bucket := strings.Split(uri, "/")[0]
|
||||
key := strings.Join(strings.Split(uri, "/")[1:], "/")
|
||||
|
||||
chunks, size, etag, err := storage.IngestStream(input)
|
||||
chunks, size, etag, err := s.blob.IngestStream(input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -38,8 +60,14 @@ func (s *ObjectService) PutObject(uri string, contentType string, input io.Reade
|
||||
Chunks: chunks,
|
||||
CreatedAt: timestamp,
|
||||
}
|
||||
fmt.Println(manifest)
|
||||
if err = s.metadataHandler.PutManifest(manifest); err != nil {
|
||||
slog.Debug("object_written_manifest",
|
||||
"bucket", manifest.Bucket,
|
||||
"key", manifest.Key,
|
||||
"size", manifest.Size,
|
||||
"chunk_count", len(manifest.Chunks),
|
||||
"etag", manifest.ETag,
|
||||
)
|
||||
if err = s.metadata.PutManifest(manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -47,19 +75,317 @@ func (s *ObjectService) PutObject(uri string, contentType string, input io.Reade
|
||||
}
|
||||
|
||||
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
||||
manifest, err := s.metadataHandler.GetManifest(bucket, key)
|
||||
s.gcMu.RLock()
|
||||
|
||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
s.gcMu.RUnlock()
|
||||
return nil, nil, err
|
||||
}
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
|
||||
err := storage.AssembleStream(manifest.Chunks, pw)
|
||||
if err != nil {
|
||||
defer s.gcMu.RUnlock()
|
||||
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
_ = pw.Close()
|
||||
}()
|
||||
return pr, manifest, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
return models.ObjectManifest{}, err
|
||||
}
|
||||
return *manifest, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteObject(bucket, key string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.DeleteManifest(bucket, key)
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.ListObjects(bucket, prefix)
|
||||
}
|
||||
|
||||
func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.ForEachObjectFrom(bucket, startKey, fn)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.CreateBucket(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
_, err := s.metadata.GetBucketManifest(bucket)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.GetBucketManifest(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.DeleteBucket(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
return s.metadata.ListBuckets()
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.DeleteManifests(bucket, keys)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
return s.metadata.CreateMultipartUpload(bucket, key)
|
||||
}
|
||||
|
||||
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
if partNumber < 1 || partNumber > 10000 {
|
||||
return "", ErrInvalidPart
|
||||
}
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadId)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if upload.Bucket != bucket || upload.Key != key {
|
||||
return "", metadata.ErrMultipartNotFound
|
||||
}
|
||||
|
||||
var uploadedPart models.UploadedPart
|
||||
chunkIds, totalSize, etag, err := s.blob.IngestStream(input)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
uploadedPart = models.UploadedPart{
|
||||
PartNumber: partNumber,
|
||||
ETag: etag,
|
||||
Size: totalSize,
|
||||
Chunks: chunkIds,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}
|
||||
err = s.metadata.PutMultipartPart(uploadId, uploadedPart)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return etag, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if upload.Bucket != bucket || upload.Key != key {
|
||||
return nil, metadata.ErrMultipartNotFound
|
||||
}
|
||||
return s.metadata.ListMultipartParts(uploadID)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
if len(completed) == 0 {
|
||||
return nil, ErrInvalidCompleteRequest
|
||||
}
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if upload.Bucket != bucket || upload.Key != key {
|
||||
return nil, metadata.ErrMultipartNotFound
|
||||
}
|
||||
|
||||
storedParts, err := s.metadata.ListMultipartParts(uploadID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
partsByNumber := make(map[int]models.UploadedPart, len(storedParts))
|
||||
for _, part := range storedParts {
|
||||
partsByNumber[part.PartNumber] = part
|
||||
}
|
||||
|
||||
lastPartNumber := 0
|
||||
orderedParts := make([]models.UploadedPart, 0, len(completed))
|
||||
chunks := make([]string, 0)
|
||||
var totalSize int64
|
||||
|
||||
for i, part := range completed {
|
||||
if part.PartNumber <= lastPartNumber {
|
||||
return nil, ErrInvalidPartOrder
|
||||
}
|
||||
lastPartNumber = part.PartNumber
|
||||
|
||||
storedPart, ok := partsByNumber[part.PartNumber]
|
||||
if !ok {
|
||||
return nil, ErrInvalidPart
|
||||
}
|
||||
if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) {
|
||||
return nil, ErrInvalidPart
|
||||
}
|
||||
if i < len(completed)-1 && storedPart.Size < 5*1024*1024 {
|
||||
return nil, ErrEntityTooSmall
|
||||
}
|
||||
|
||||
orderedParts = append(orderedParts, storedPart)
|
||||
chunks = append(chunks, storedPart.Chunks...)
|
||||
totalSize += storedPart.Size
|
||||
}
|
||||
|
||||
finalETag := buildMultipartETag(orderedParts)
|
||||
manifest := &models.ObjectManifest{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
Size: totalSize,
|
||||
ContentType: "application/octet-stream",
|
||||
ETag: finalETag,
|
||||
Chunks: chunks,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}
|
||||
|
||||
if err := s.metadata.CompleteMultipartUpload(uploadID, manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if upload.Bucket != bucket || upload.Key != key {
|
||||
return metadata.ErrMultipartNotFound
|
||||
}
|
||||
return s.metadata.AbortMultipartUpload(uploadID)
|
||||
}
|
||||
|
||||
func normalizeETag(etag string) string {
|
||||
return strings.Trim(etag, "\"")
|
||||
}
|
||||
|
||||
func buildMultipartETag(parts []models.UploadedPart) string {
|
||||
hasher := md5.New()
|
||||
for _, part := range parts {
|
||||
etagBytes, err := hex.DecodeString(normalizeETag(part.ETag))
|
||||
if err == nil {
|
||||
_, _ = hasher.Write(etagBytes)
|
||||
continue
|
||||
}
|
||||
_, _ = hasher.Write([]byte(normalizeETag(part.ETag)))
|
||||
}
|
||||
return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts))
|
||||
}
|
||||
|
||||
func (s *ObjectService) Close() error {
|
||||
return s.metadata.Close()
|
||||
}
|
||||
|
||||
func (s *ObjectService) GarbageCollect() error {
|
||||
s.gcMu.Lock()
|
||||
defer s.gcMu.Unlock()
|
||||
|
||||
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalChunks := 0
|
||||
deletedChunks := 0
|
||||
deleteErrors := 0
|
||||
cleanedUploads := 0
|
||||
|
||||
if err := s.blob.ForEachChunk(func(chunkID string) error {
|
||||
totalChunks++
|
||||
if _, found := referencedChunkSet[chunkID]; found {
|
||||
return nil
|
||||
}
|
||||
if err := s.blob.DeleteBlob(chunkID); err != nil {
|
||||
deleteErrors++
|
||||
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
|
||||
return nil
|
||||
}
|
||||
deletedChunks++
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Info("garbage_collect_completed",
|
||||
"referenced_chunks", len(referencedChunkSet),
|
||||
"total_chunks", totalChunks,
|
||||
"deleted_chunks", deletedChunks,
|
||||
"delete_errors", deleteErrors,
|
||||
"cleaned_uploads", cleanedUploads,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) {
|
||||
if interval <= 0 {
|
||||
slog.Warn("garbage_collect_disabled_invalid_interval", "interval", interval.String())
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
_ = s.GarbageCollect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
161
storage/blob.go
161
storage/blob.go
@@ -4,24 +4,48 @@ import (
|
||||
"crypto/md5"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const chunkSize = 64 * 1024
|
||||
const blobRoot = "blobs/"
|
||||
const blobRoot = "blobs"
|
||||
const maxChunkSize = 64 * 1024 * 1024
|
||||
|
||||
func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
||||
type BlobStore struct {
|
||||
dataRoot string
|
||||
chunkSize int
|
||||
}
|
||||
|
||||
func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
|
||||
root = strings.TrimSpace(root)
|
||||
if root == "" {
|
||||
return nil, errors.New("blob root is required")
|
||||
}
|
||||
if chunkSize <= 0 || chunkSize > maxChunkSize {
|
||||
return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize)
|
||||
}
|
||||
|
||||
cleanRoot := filepath.Clean(root)
|
||||
if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil
|
||||
}
|
||||
|
||||
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
||||
fullFileHasher := md5.New()
|
||||
|
||||
buffer := make([]byte, chunkSize)
|
||||
buffer := make([]byte, bs.chunkSize)
|
||||
var totalSize int64
|
||||
var chunkIDs []string
|
||||
|
||||
for {
|
||||
bytesRead, err := io.ReadFull(stream, buffer)
|
||||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
|
||||
@@ -34,13 +58,13 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
||||
chunkHash := sha256.Sum256(chunkData)
|
||||
chunkID := hex.EncodeToString(chunkHash[:])
|
||||
|
||||
err := saveBlob(chunkID, chunkData)
|
||||
err := bs.saveBlob(chunkID, chunkData)
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
chunkIDs = append(chunkIDs, chunkID)
|
||||
}
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
@@ -48,29 +72,68 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
||||
return chunkIDs, totalSize, etag, nil
|
||||
}
|
||||
|
||||
func saveBlob(chunkID string, data []byte) error {
|
||||
dir := filepath.Join(blobRoot, chunkID[:2], chunkID[2:4])
|
||||
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
||||
if !isValidChunkID(chunkID) {
|
||||
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||
}
|
||||
dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4])
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fullPath := filepath.Join(dir, chunkID)
|
||||
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
|
||||
if err := os.WriteFile(fullPath, data, 0644); err != nil {
|
||||
return err
|
||||
if _, err := os.Stat(fullPath); err == nil {
|
||||
return nil
|
||||
} else if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmpPath := tmpFile.Name()
|
||||
cleanup := true
|
||||
defer func() {
|
||||
if cleanup {
|
||||
_ = os.Remove(tmpPath)
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := tmpFile.Write(data); err != nil {
|
||||
_ = tmpFile.Close()
|
||||
return err
|
||||
}
|
||||
if err := tmpFile.Sync(); err != nil {
|
||||
_ = tmpFile.Close()
|
||||
return err
|
||||
}
|
||||
if err := tmpFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, fullPath); err != nil {
|
||||
if _, statErr := os.Stat(fullPath); statErr == nil {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
cleanup = false
|
||||
|
||||
if err := syncDir(dir); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
||||
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
||||
for _, chunkID := range chunkIDs {
|
||||
chunkData, err := GetBlob(chunkID)
|
||||
chunkData, err := bs.GetBlob(chunkID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -81,7 +144,69 @@ func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetBlob(chunkID string) ([]byte, error) {
|
||||
|
||||
return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
|
||||
if !isValidChunkID(chunkID) {
|
||||
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||
}
|
||||
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||
}
|
||||
|
||||
func (bs *BlobStore) DeleteBlob(chunkID string) error {
|
||||
if !isValidChunkID(chunkID) {
|
||||
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||
}
|
||||
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (bs *BlobStore) ListChunks() ([]string, error) {
|
||||
var chunkIDs []string
|
||||
err := bs.ForEachChunk(func(chunkID string) error {
|
||||
chunkIDs = append(chunkIDs, chunkID)
|
||||
return nil
|
||||
})
|
||||
return chunkIDs, err
|
||||
}
|
||||
|
||||
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
|
||||
if fn == nil {
|
||||
return errors.New("chunk callback is required")
|
||||
}
|
||||
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
chunkID := info.Name()
|
||||
if isValidChunkID(chunkID) {
|
||||
return fn(chunkID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func isValidChunkID(chunkID string) bool {
|
||||
if len(chunkID) != sha256.Size*2 {
|
||||
return false
|
||||
}
|
||||
for _, ch := range chunkID {
|
||||
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func syncDir(dirPath string) error {
|
||||
dir, err := os.Open(dirPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dir.Close()
|
||||
return dir.Sync()
|
||||
}
|
||||
|
||||
98
utils/config.go
Normal file
98
utils/config.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DataPath string
|
||||
Address string
|
||||
Port int
|
||||
ChunkSize int
|
||||
LogLevel string
|
||||
LogFormat string
|
||||
AuditLog bool
|
||||
GcInterval time.Duration
|
||||
GcEnabled bool
|
||||
MultipartCleanupRetention time.Duration
|
||||
}
|
||||
|
||||
func NewConfig() *Config {
|
||||
_ = godotenv.Load()
|
||||
|
||||
config := &Config{
|
||||
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
|
||||
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
|
||||
Port: envIntRange("PORT", 3000, 1, 65535),
|
||||
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
|
||||
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
|
||||
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
|
||||
AuditLog: envBool("AUDIT_LOG", true),
|
||||
GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, 1, 60)) * time.Minute,
|
||||
GcEnabled: envBool("GC_ENABLED", true),
|
||||
MultipartCleanupRetention: time.Duration(
|
||||
envIntRange("MULTIPART_RETENTION_HOURS", 24, 1, 24*30),
|
||||
) * time.Hour,
|
||||
}
|
||||
|
||||
if config.LogFormat != "json" && config.LogFormat != "text" {
|
||||
config.LogFormat = "text"
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
}
|
||||
|
||||
func envIntRange(key string, defaultValue, minValue, maxValue int) int {
|
||||
raw := strings.TrimSpace(os.Getenv(key))
|
||||
if raw == "" {
|
||||
return defaultValue
|
||||
}
|
||||
value, err := strconv.Atoi(raw)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
if value < minValue || value > maxValue {
|
||||
return defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func envBool(key string, defaultValue bool) bool {
|
||||
raw := strings.TrimSpace(os.Getenv(key))
|
||||
if raw == "" {
|
||||
return defaultValue
|
||||
}
|
||||
value, err := strconv.ParseBool(raw)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func firstNonEmpty(values ...string) string {
|
||||
for _, v := range values {
|
||||
if v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func sanitizeDataPath(raw string) string {
|
||||
cleaned := strings.TrimSpace(raw)
|
||||
if cleaned == "" {
|
||||
cleaned = "."
|
||||
}
|
||||
cleaned = filepath.Clean(cleaned)
|
||||
if abs, err := filepath.Abs(cleaned); err == nil {
|
||||
return abs
|
||||
}
|
||||
return cleaned
|
||||
}
|
||||
54
utils/utils.go
Normal file
54
utils/utils.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fs/models"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func ConstructXMLResponseForObjectList(bucket string, objects []*models.ObjectManifest) (string, error) {
|
||||
result := models.ListBucketResult{
|
||||
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
|
||||
Name: bucket,
|
||||
Prefix: "",
|
||||
KeyCount: len(objects),
|
||||
MaxKeys: 1000,
|
||||
IsTruncated: false,
|
||||
}
|
||||
|
||||
prefixSet := make(map[string]struct{})
|
||||
|
||||
for _, object := range objects {
|
||||
result.Contents = append(result.Contents, models.Contents{
|
||||
Key: object.Key,
|
||||
LastModified: time.Unix(object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
ETag: "\"" + object.ETag + "\"",
|
||||
Size: object.Size,
|
||||
StorageClass: "STANDARD",
|
||||
})
|
||||
|
||||
if strings.Contains(object.Key, "/") {
|
||||
parts := strings.SplitN(object.Key, "/", 2)
|
||||
prefixSet[parts[0]+"/"] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
prefixes := make([]string, 0, len(prefixSet))
|
||||
for prefix := range prefixSet {
|
||||
prefixes = append(prefixes, prefix)
|
||||
}
|
||||
sort.Strings(prefixes)
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{Prefix: prefix})
|
||||
}
|
||||
|
||||
output, err := xml.MarshalIndent(result, "", " ")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return xml.Header + string(output), nil
|
||||
}
|
||||
Reference in New Issue
Block a user