Files
fs/api/api.go

1219 lines
32 KiB
Go

package api
import (
"bufio"
"context"
"encoding/base64"
"encoding/xml"
"errors"
"fmt"
"fs/auth"
"fs/logging"
"fs/metadata"
"fs/metrics"
"fs/models"
"fs/service"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
type Handler struct {
router *chi.Mux
svc *service.ObjectService
logger *slog.Logger
logConfig logging.Config
authSvc *auth.Service
}
const (
maxXMLBodyBytes int64 = 1 << 20
maxDeleteObjects = 1000
maxObjectKeyBytes = 1024
serverReadHeaderTimeout = 5 * time.Second
serverReadTimeout = 60 * time.Second
serverWriteTimeout = 120 * time.Second
serverIdleTimeout = 120 * time.Second
serverMaxHeaderBytes = 1 << 20
serverMaxConnections = 1024
)
func NewHandler(svc *service.ObjectService, logger *slog.Logger, logConfig logging.Config, authSvc *auth.Service) *Handler {
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.Recoverer)
if logger == nil {
logger = slog.Default()
}
h := &Handler{
router: r,
svc: svc,
logger: logger,
logConfig: logConfig,
authSvc: authSvc,
}
return h
}
func (h *Handler) setupRoutes() {
h.router.Use(logging.HTTPMiddleware(h.logger, h.logConfig))
h.router.Use(auth.Middleware(h.authSvc, h.logger, h.logConfig.Audit, writeMappedS3Error))
h.router.Get("/healthz", h.handleHealth)
h.router.Head("/healthz", h.handleHealth)
h.router.Get("/metrics", h.handleMetrics)
h.router.Head("/metrics", h.handleMetrics)
h.router.Get("/", h.handleGetBuckets)
h.router.Get("/{bucket}/", h.handleGetBucket)
h.router.Get("/{bucket}", h.handleGetBucket)
h.router.Put("/{bucket}", h.handlePutBucket)
h.router.Put("/{bucket}/", h.handlePutBucket)
h.router.Post("/{bucket}", h.handlePostBucket)
h.router.Post("/{bucket}/", h.handlePostBucket)
h.router.Delete("/{bucket}", h.handleDeleteBucket)
h.router.Delete("/{bucket}/", h.handleDeleteBucket)
h.router.Head("/{bucket}", h.handleHeadBucket)
h.router.Head("/{bucket}/", h.handleHeadBucket)
h.router.Get("/{bucket}/*", h.handleGetObject)
h.router.Put("/{bucket}/*", h.handlePutObject)
h.router.Post("/{bucket}/*", h.handlePostObject)
h.router.Head("/{bucket}/*", h.handleHeadObject)
h.router.Delete("/{bucket}/*", h.handleDeleteObject)
}
func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) {
if _, err := h.svc.ListBuckets(); err != nil {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusServiceUnavailable)
if r.Method != http.MethodHead {
_, _ = w.Write([]byte("unhealthy"))
}
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
if r.Method != http.MethodHead {
_, _ = w.Write([]byte("ok"))
}
}
func (h *Handler) handleMetrics(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
payload := metrics.Default.RenderPrometheus()
w.Header().Set("Content-Length", strconv.Itoa(len(payload)))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(payload))
}
func validateObjectKey(key string) *s3APIError {
if key == "" {
err := s3ErrInvalidObjectKey
return &err
}
if len(key) > maxObjectKeyBytes {
err := s3ErrKeyTooLong
return &err
}
return nil
}
func (h *Handler) handleGetObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if apiErr := validateObjectKey(key); apiErr != nil {
writeS3Error(w, r, *apiErr, r.URL.Path)
return
}
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
h.handleListMultipartParts(w, r, bucket, key, uploadID)
return
}
stream, manifest, err := h.svc.GetObject(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
defer stream.Close()
rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
if rangeHeader != "" {
start, end, err := parseSingleByteRange(rangeHeader, manifest.Size)
if err != nil {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.Size))
writeS3Error(w, r, s3ErrInvalidRange, r.URL.Path)
return
}
if start > 0 {
if _, err := io.CopyN(io.Discard, stream, start); err != nil {
writeMappedS3Error(w, r, err)
return
}
}
length := end - start + 1
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, manifest.Size))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
_, _ = io.CopyN(w, stream, length)
return
}
w.Header().Set("Content-Type", manifest.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(manifest.Size, 10))
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, stream)
}
func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if apiErr := validateObjectKey(key); apiErr != nil {
writeS3Error(w, r, *apiErr, r.URL.Path)
return
}
defer r.Body.Close()
if _, ok := r.URL.Query()["uploads"]; ok {
upload, err := h.svc.CreateMultipartUpload(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.InitiateMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: upload.Bucket,
Key: upload.Key,
UploadID: upload.UploadID,
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
return
}
if uploadID := r.URL.Query().Get("uploadId"); uploadID != "" {
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
var req models.CompleteMultipartUploadRequest
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
return
}
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
metrics.Default.ObserveBatchSize(len(req.Parts))
manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.CompleteMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucket,
Key: key,
ETag: `"` + manifest.ETag + `"`,
Location: r.URL.Path,
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}
func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if apiErr := validateObjectKey(key); apiErr != nil {
writeS3Error(w, r, *apiErr, r.URL.Path)
return
}
defer r.Body.Close()
uploadID := r.URL.Query().Get("uploadId")
partNumberRaw := r.URL.Query().Get("partNumber")
if uploadID != "" || partNumberRaw != "" {
if uploadID == "" || partNumberRaw == "" {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
partNumber, err := strconv.Atoi(partNumberRaw)
if err != nil {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
if partNumber < 1 || partNumber > 10000 {
writeS3Error(w, r, s3ErrInvalidPart, r.URL.Path)
return
}
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
etag, err := h.svc.UploadPart(bucket, key, uploadID, partNumber, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusOK)
return
}
metrics.Default.ObserveBatchSize(1)
if ifNoneMatch := strings.TrimSpace(r.Header.Get("If-None-Match")); ifNoneMatch != "" {
manifest, err := h.svc.HeadObject(bucket, key)
if err != nil {
if !errors.Is(err, metadata.ErrObjectNotFound) {
writeMappedS3Error(w, r, err)
return
}
} else if ifNoneMatchPreconditionFailed(ifNoneMatch, manifest.ETag) {
writeS3Error(w, r, s3ErrPreconditionFailed, r.URL.Path)
return
}
}
contentType := r.Header.Get("Content-Type")
if contentType == "" {
contentType = "application/octet-stream"
}
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
manifest, err := h.svc.PutObject(bucket, key, contentType, bodyReader)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("ETag", `"`+manifest.ETag+`"`)
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusOK)
}
func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Request, bucket, key, uploadID string) {
parts, err := h.svc.ListMultipartParts(bucket, key, uploadID)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.ListPartsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucket,
Key: key,
UploadID: uploadID,
Parts: make([]models.PartItem, 0, len(parts)),
}
for _, part := range parts {
response.Parts = append(response.Parts, models.PartItem{
PartNumber: part.PartNumber,
LastModified: time.Unix(part.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + part.ETag + `"`,
Size: part.Size,
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
if strings.Contains(contentEncoding, "aws-chunked") {
return true
}
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
return strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload")
}
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
if err := decodeAWSChunkedPayload(src, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
}()
return pr
}
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
reader := bufio.NewReader(src)
for {
headerLine, err := reader.ReadString('\n')
if err != nil {
return err
}
headerLine = strings.TrimRight(headerLine, "\r\n")
chunkSizeToken := headerLine
if idx := strings.IndexByte(chunkSizeToken, ';'); idx >= 0 {
chunkSizeToken = chunkSizeToken[:idx]
}
chunkSizeToken = strings.TrimSpace(chunkSizeToken)
chunkSize, err := strconv.ParseInt(chunkSizeToken, 16, 64)
if err != nil {
return fmt.Errorf("invalid aws-chunked header %q: %w", headerLine, err)
}
if chunkSize < 0 {
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
}
if chunkSize > 0 {
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
return err
}
}
crlf := make([]byte, 2)
if _, err := io.ReadFull(reader, crlf); err != nil {
return err
}
if crlf[0] != '\r' || crlf[1] != '\n' {
return errors.New("invalid aws-chunked payload terminator")
}
if chunkSize == 0 {
for {
line, err := reader.ReadString('\n')
if err != nil {
return err
}
if line == "\r\n" || line == "\n" {
return nil
}
}
}
}
}
func ifNoneMatchPreconditionFailed(headerValue, etag string) bool {
for _, rawToken := range strings.Split(headerValue, ",") {
token := strings.TrimSpace(rawToken)
if token == "" {
continue
}
if token == "*" {
return true
}
token = strings.TrimPrefix(token, "W/")
token = strings.Trim(token, `"`)
if strings.EqualFold(token, etag) {
return true
}
}
return false
}
func (h *Handler) handlePutBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if err := h.svc.CreateBucket(bucket); err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusCreated)
}
func (h *Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if err := h.svc.DeleteBucket(bucket); err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if _, ok := r.URL.Query()["delete"]; !ok {
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
return
}
defer r.Body.Close()
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser
if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close()
bodyReader = decodeStream
}
var req models.DeleteObjectsRequest
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
return
}
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
if len(req.Objects) > maxDeleteObjects {
writeS3Error(w, r, s3ErrTooManyDeleteObjects, r.URL.Path)
return
}
metrics.Default.ObserveBatchSize(len(req.Objects))
keys := make([]string, 0, len(req.Objects))
response := models.DeleteObjectsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
}
for _, obj := range req.Objects {
if obj.Key == "" {
response.Errors = append(response.Errors, models.DeleteError{
Key: obj.Key,
Code: s3ErrInvalidObjectKey.Code,
Message: s3ErrInvalidObjectKey.Message,
})
continue
}
if len(obj.Key) > maxObjectKeyBytes {
response.Errors = append(response.Errors, models.DeleteError{
Key: obj.Key,
Code: s3ErrKeyTooLong.Code,
Message: s3ErrKeyTooLong.Message,
})
continue
}
keys = append(keys, obj.Key)
}
deleted, err := h.svc.DeleteObjects(bucket, keys)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
if !req.Quiet {
response.Deleted = make([]models.DeletedEntry, 0, len(deleted))
for _, key := range deleted {
response.Deleted = append(response.Deleted, models.DeletedEntry{Key: key})
}
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if apiErr := validateObjectKey(key); apiErr != nil {
writeS3Error(w, r, *apiErr, r.URL.Path)
return
}
if uploadId := r.URL.Query().Get("uploadId"); uploadId != "" {
err := h.svc.AbortMultipartUpload(bucket, key, uploadId)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
return
}
err := h.svc.DeleteObject(bucket, key)
if err != nil {
if errors.Is(err, metadata.ErrObjectNotFound) {
w.WriteHeader(http.StatusNoContent)
return
}
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
if err := h.svc.HeadBucket(bucket); err != nil {
writeMappedS3Error(w, r, err)
return
}
w.WriteHeader(http.StatusOK)
}
func (h *Handler) handleHeadObject(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
key := chi.URLParam(r, "*")
if apiErr := validateObjectKey(key); apiErr != nil {
writeS3Error(w, r, *apiErr, r.URL.Path)
return
}
manifest, err := h.svc.HeadObject(bucket, key)
if err != nil {
writeMappedS3Error(w, r, err)
return
}
etag := manifest.ETag
size := strconv.FormatInt(manifest.Size, 10)
w.Header().Set("ETag", `"`+etag+`"`)
w.Header().Set("Content-Length", size)
w.Header().Set("Last-Modified", time.Unix(manifest.CreatedAt, 0).UTC().Format(http.TimeFormat))
w.WriteHeader(http.StatusOK)
}
type limitedListener struct {
net.Listener
slots chan struct{}
}
func newLimitedListener(inner net.Listener, maxConns int) net.Listener {
if maxConns <= 0 {
return inner
}
metrics.Default.SetConnectionPoolMax(maxConns)
return &limitedListener{
Listener: inner,
slots: make(chan struct{}, maxConns),
}
}
func (l *limitedListener) Accept() (net.Conn, error) {
select {
case l.slots <- struct{}{}:
default:
metrics.Default.IncConnectionPoolWait()
metrics.Default.IncRequestQueueLength()
l.slots <- struct{}{}
metrics.Default.DecRequestQueueLength()
}
metrics.Default.IncConnectionPoolActive()
conn, err := l.Listener.Accept()
if err != nil {
<-l.slots
metrics.Default.DecConnectionPoolActive()
return nil, err
}
return &limitedConn{
Conn: conn,
done: func() {
<-l.slots
metrics.Default.DecConnectionPoolActive()
},
}, nil
}
type limitedConn struct {
net.Conn
once sync.Once
done func()
}
func (c *limitedConn) Close() error {
err := c.Conn.Close()
c.once.Do(c.done)
return err
}
func (h *Handler) handleGetBuckets(w http.ResponseWriter, r *http.Request) {
buckets, err := h.svc.ListBuckets()
if err != nil {
writeMappedS3Error(w, r, err)
return
}
response := models.ListAllMyBucketsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Owner: models.BucketsOwner{
ID: "local",
DisplayName: "local",
},
Buckets: models.BucketsElement{
Items: make([]models.BucketItem, 0, len(buckets)),
},
}
for _, bucket := range buckets {
manifest, err := h.svc.GetBucketManifest(bucket)
if err != nil {
h.logger.Warn("bucket_manifest_read_failed", "bucket", bucket, "error", err)
continue
}
response.Buckets.Items = append(response.Buckets.Items, models.BucketItem{
Name: bucket,
CreationDate: manifest.CreatedAt.UTC().Format("2006-01-02T15:04:05.000Z"),
})
}
payload, err := xml.MarshalIndent(response, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(payload)
}
func (h *Handler) handleGetBucket(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
query := r.URL.Query()
if query.Has("location") {
xmlResponse := `<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">us-east-1</LocationConstraint>`
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xmlResponse))
return
}
listType := strings.TrimSpace(query.Get("list-type"))
if listType == "2" {
h.handleListObjectsV2(w, r, bucket)
return
}
if listType != "" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if shouldUseListObjectsV1(query) {
h.handleListObjectsV1(w, r, bucket)
return
}
writeS3Error(w, r, s3ErrNotImplemented, r.URL.Path)
}
func shouldUseListObjectsV1(query url.Values) bool {
if len(query) == 0 {
return true
}
listingParams := map[string]struct{}{
"delimiter": {},
"encoding-type": {},
"marker": {},
"max-keys": {},
"prefix": {},
}
for key := range query {
if _, ok := listingParams[key]; !ok {
return false
}
}
return true
}
func (h *Handler) handleListObjectsV1(w http.ResponseWriter, r *http.Request, bucket string) {
prefix := r.URL.Query().Get("prefix")
delimiter := r.URL.Query().Get("delimiter")
marker := r.URL.Query().Get("marker")
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
if encodingType != "" && encodingType != "url" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
maxKeys := 1000
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
parsed, err := strconv.Atoi(rawMaxKeys)
if err != nil || parsed < 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if parsed > 1000 {
parsed = 1000
}
maxKeys = parsed
}
result := models.ListBucketResultV1{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: s3EncodeIfNeeded(prefix, encodingType),
Marker: s3EncodeIfNeeded(marker, encodingType),
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
MaxKeys: maxKeys,
EncodingType: encodingType,
}
type pageEntry struct {
Object *models.ObjectManifest
CommonPrefix string
}
entries := make([]pageEntry, 0, maxKeys)
seenCommonPrefixes := make(map[string]struct{})
truncated := false
stopErr := errors.New("list_v1_page_complete")
startKey := prefix
if marker != "" && marker > startKey {
startKey = marker
}
if maxKeys > 0 {
err := h.svc.ForEachObjectFrom(bucket, startKey, func(object *models.ObjectManifest) error {
if object == nil {
return nil
}
key := object.Key
if prefix != "" {
if key < prefix {
return nil
}
if !strings.HasPrefix(key, prefix) {
return stopErr
}
}
if marker != "" && key <= marker {
return nil
}
if delimiter != "" {
relative := strings.TrimPrefix(key, prefix)
if idx := strings.Index(relative, delimiter); idx >= 0 {
commonPrefix := prefix + relative[:idx+len(delimiter)]
if marker != "" && commonPrefix <= marker {
return nil
}
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
return nil
}
seenCommonPrefixes[commonPrefix] = struct{}{}
if len(entries) >= maxKeys {
truncated = true
return stopErr
}
entries = append(entries, pageEntry{
CommonPrefix: commonPrefix,
})
return nil
}
}
if len(entries) >= maxKeys {
truncated = true
return stopErr
}
entries = append(entries, pageEntry{Object: object})
return nil
})
if err != nil && !errors.Is(err, stopErr) {
writeMappedS3Error(w, r, err)
return
}
}
for _, entry := range entries {
if entry.Object != nil {
result.Contents = append(result.Contents, models.Contents{
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + entry.Object.ETag + `"`,
Size: entry.Object.Size,
StorageClass: "STANDARD",
})
} else {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
})
}
}
result.IsTruncated = truncated
if result.IsTruncated && result.NextMarker == "" && len(entries) > 0 {
last := entries[len(entries)-1]
if last.Object != nil {
result.NextMarker = s3EncodeIfNeeded(last.Object.Key, encodingType)
} else {
result.NextMarker = s3EncodeIfNeeded(last.CommonPrefix, encodingType)
}
}
xmlResponse, err := xml.MarshalIndent(result, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(xmlResponse)
}
func (h *Handler) handleListObjectsV2(w http.ResponseWriter, r *http.Request, bucket string) {
prefix := r.URL.Query().Get("prefix")
delimiter := r.URL.Query().Get("delimiter")
startAfter := r.URL.Query().Get("start-after")
encodingType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("encoding-type")))
if encodingType != "" && encodingType != "url" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
maxKeys := 1000
if rawMaxKeys := strings.TrimSpace(r.URL.Query().Get("max-keys")); rawMaxKeys != "" {
parsed, err := strconv.Atoi(rawMaxKeys)
if err != nil || parsed < 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if parsed > 1000 {
parsed = 1000
}
maxKeys = parsed
}
continuationToken := strings.TrimSpace(r.URL.Query().Get("continuation-token"))
continuationMarker := ""
continuationType := ""
continuationValue := ""
if continuationToken != "" {
decoded, err := base64.StdEncoding.DecodeString(continuationToken)
if err != nil || len(decoded) == 0 {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
continuationMarker = string(decoded)
continuationType, continuationValue, _ = strings.Cut(continuationMarker, ":")
if (continuationType != "K" && continuationType != "C") || continuationValue == "" {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
}
result := models.ListBucketResultV2{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: s3EncodeIfNeeded(prefix, encodingType),
Delimiter: s3EncodeIfNeeded(delimiter, encodingType),
MaxKeys: maxKeys,
ContinuationToken: continuationToken,
StartAfter: s3EncodeIfNeeded(startAfter, encodingType),
EncodingType: encodingType,
}
type pageEntry struct {
Marker string
Object *models.ObjectManifest
CommonPrefix string
}
entries := make([]pageEntry, 0, maxKeys)
seenCommonPrefixes := make(map[string]struct{})
truncated := false
stopErr := errors.New("list_v2_page_complete")
startKey := prefix
if continuationToken != "" {
startKey = continuationValue
} else if startAfter != "" && startAfter > startKey {
startKey = startAfter
}
if maxKeys > 0 {
err := h.svc.ForEachObjectFrom(bucket, startKey, func(object *models.ObjectManifest) error {
if object == nil {
return nil
}
key := object.Key
if prefix != "" {
if key < prefix {
return nil
}
if !strings.HasPrefix(key, prefix) {
return stopErr
}
}
if continuationToken != "" {
if continuationType == "K" && key <= continuationValue {
return nil
}
if continuationType == "C" && strings.HasPrefix(key, continuationValue) {
return nil
}
} else if startAfter != "" && key <= startAfter {
return nil
}
if delimiter != "" {
relative := strings.TrimPrefix(key, prefix)
if idx := strings.Index(relative, delimiter); idx >= 0 {
commonPrefix := prefix + relative[:idx+len(delimiter)]
if continuationToken == "" && startAfter != "" && commonPrefix <= startAfter {
return nil
}
if _, exists := seenCommonPrefixes[commonPrefix]; exists {
return nil
}
seenCommonPrefixes[commonPrefix] = struct{}{}
if len(entries) >= maxKeys {
truncated = true
return stopErr
}
entries = append(entries, pageEntry{
Marker: "C:" + commonPrefix,
CommonPrefix: commonPrefix,
})
return nil
}
}
if len(entries) >= maxKeys {
truncated = true
return stopErr
}
entries = append(entries, pageEntry{
Marker: "K:" + key,
Object: object,
})
return nil
})
if err != nil && !errors.Is(err, stopErr) {
writeMappedS3Error(w, r, err)
return
}
}
for _, entry := range entries {
if entry.Object != nil {
result.Contents = append(result.Contents, models.Contents{
Key: s3EncodeIfNeeded(entry.Object.Key, encodingType),
LastModified: time.Unix(entry.Object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: `"` + entry.Object.ETag + `"`,
Size: entry.Object.Size,
StorageClass: "STANDARD",
})
} else {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{
Prefix: s3EncodeIfNeeded(entry.CommonPrefix, encodingType),
})
}
result.KeyCount++
}
result.IsTruncated = truncated
if result.IsTruncated && result.KeyCount > 0 {
result.NextContinuationToken = base64.StdEncoding.EncodeToString([]byte(entries[result.KeyCount-1].Marker))
}
xmlResponse, err := xml.MarshalIndent(result, "", " ")
if err != nil {
writeMappedS3Error(w, r, err)
return
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(xml.Header)+len(xmlResponse)))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(xmlResponse)
}
func s3EncodeIfNeeded(value, encodingType string) string {
if encodingType != "url" || value == "" {
return value
}
encoded := url.QueryEscape(value)
return strings.ReplaceAll(encoded, "+", "%20")
}
func parseSingleByteRange(rangeHeader string, size int64) (int64, int64, error) {
if size <= 0 || !strings.HasPrefix(rangeHeader, "bytes=") {
return 0, 0, errors.New("invalid range")
}
spec := strings.TrimSpace(strings.TrimPrefix(rangeHeader, "bytes="))
if spec == "" || strings.Contains(spec, ",") {
return 0, 0, errors.New("invalid range")
}
parts := strings.SplitN(spec, "-", 2)
if len(parts) != 2 {
return 0, 0, errors.New("invalid range")
}
if parts[0] == "" {
suffixLength, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil || suffixLength <= 0 {
return 0, 0, errors.New("invalid range")
}
if suffixLength > size {
suffixLength = size
}
start := size - suffixLength
end := size - 1
return start, end, nil
}
start, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil || start < 0 || start >= size {
return 0, 0, errors.New("invalid range")
}
var end int64
if parts[1] == "" {
end = size - 1
} else {
end, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil || end < start {
return 0, 0, errors.New("invalid range")
}
if end >= size {
end = size - 1
}
}
return start, end, nil
}
func (h *Handler) Start(ctx context.Context, address string) error {
if ctx == nil {
ctx = context.Background()
}
h.logger.Info("server_starting",
"address", address,
"log_format", h.logConfig.Format,
"log_level", h.logConfig.LevelName,
"audit_log", h.logConfig.Audit,
)
h.setupRoutes()
server := http.Server{
Addr: address,
Handler: h.router,
ReadHeaderTimeout: serverReadHeaderTimeout,
ReadTimeout: serverReadTimeout,
WriteTimeout: serverWriteTimeout,
IdleTimeout: serverIdleTimeout,
MaxHeaderBytes: serverMaxHeaderBytes,
}
errCh := make(chan error, 1)
listener, err := net.Listen("tcp", address)
if err != nil {
return err
}
limitedListener := newLimitedListener(listener, serverMaxConnections)
go func() {
if err := server.Serve(limitedListener); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}
}()
select {
case <-ctx.Done():
h.logger.Info("shutdown_context_done", "reason", ctx.Err())
case err := <-errCh:
h.logger.Error("server_listen_failed", "error", err)
if closeErr := h.svc.Close(); closeErr != nil {
h.logger.Error("service_close_failed", "error", closeErr)
}
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
h.logger.Error("server_shutdown_failed", "error", err)
return err
}
if err := h.svc.Close(); err != nil {
h.logger.Error("service_close_failed", "error", err)
return err
}
h.logger.Info("server_stopped")
return nil
}