mirror of
https://github.com/ferdzo/fs.git
synced 2026-06-04 07:36:47 +00:00
Compare commits
6 Commits
develop
...
fix/multi-
| Author | SHA1 | Date | |
|---|---|---|---|
| f61cc3168b | |||
| e928ebca15 | |||
| 654a505c0d | |||
| 0f9b461e8e | |||
| c3c9e3262f | |||
| 2425cd524e |
@@ -1,6 +1,7 @@
|
|||||||
LOG_LEVEL=debug
|
LOG_LEVEL=debug
|
||||||
LOG_FORMAT=text
|
LOG_FORMAT=text
|
||||||
DATA_PATH=data/
|
DATA_PATH=data/
|
||||||
|
FS_MAX_OBJECT_UPLOAD_BYTES=5368709120
|
||||||
PORT=2600
|
PORT=2600
|
||||||
AUDIT_LOG=true
|
AUDIT_LOG=true
|
||||||
ADDRESS=0.0.0.0
|
ADDRESS=0.0.0.0
|
||||||
|
|||||||
@@ -127,6 +127,9 @@ Required when `FS_AUTH_ENABLED=true`:
|
|||||||
- `FS_ROOT_USER` and `FS_ROOT_PASSWORD` define initial credentials
|
- `FS_ROOT_USER` and `FS_ROOT_PASSWORD` define initial credentials
|
||||||
- `ADMIN_API_ENABLED=true` enables `/_admin/v1/*` routes (bootstrap key only)
|
- `ADMIN_API_ENABLED=true` enables `/_admin/v1/*` routes (bootstrap key only)
|
||||||
|
|
||||||
|
Upload limits:
|
||||||
|
- `FS_MAX_OBJECT_UPLOAD_BYTES` limits object PUT payloads, multipart upload parts, and completed multipart object size (default 5 GiB).
|
||||||
|
|
||||||
Reference: `auth/README.md`
|
Reference: `auth/README.md`
|
||||||
|
|
||||||
Additional docs:
|
Additional docs:
|
||||||
|
|||||||
86
api/api.go
86
api/api.go
@@ -41,6 +41,7 @@ const (
|
|||||||
maxXMLBodyBytes int64 = 1 << 20
|
maxXMLBodyBytes int64 = 1 << 20
|
||||||
maxDeleteObjects = 1000
|
maxDeleteObjects = 1000
|
||||||
maxObjectKeyBytes = 1024
|
maxObjectKeyBytes = 1024
|
||||||
|
maxAWSChunkedLineBytes = 8 << 10
|
||||||
serverReadHeaderTimeout = 5 * time.Second
|
serverReadHeaderTimeout = 5 * time.Second
|
||||||
serverReadTimeout = 60 * time.Second
|
serverReadTimeout = 60 * time.Second
|
||||||
serverWriteTimeout = 120 * time.Second
|
serverWriteTimeout = 120 * time.Second
|
||||||
@@ -196,6 +197,10 @@ func parseCopySource(raw string) (string, string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) authorizeCopySource(r *http.Request, bucket, key string) error {
|
func (h *Handler) authorizeCopySource(r *http.Request, bucket, key string) error {
|
||||||
|
return h.authorizeObjectAction(r, auth.ActionGetObject, bucket, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) authorizeObjectAction(r *http.Request, action auth.Action, bucket, key string) error {
|
||||||
if h.authSvc == nil || !h.authSvc.Config().Enabled {
|
if h.authSvc == nil || !h.authSvc.Config().Enabled {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -206,7 +211,7 @@ func (h *Handler) authorizeCopySource(r *http.Request, bucket, key string) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
return h.authSvc.Authorize(authCtx.AccessKeyID, auth.RequestTarget{
|
return h.authSvc.Authorize(authCtx.AccessKeyID, auth.RequestTarget{
|
||||||
Action: auth.ActionGetObject,
|
Action: action,
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Key: key,
|
Key: key,
|
||||||
})
|
})
|
||||||
@@ -307,6 +312,10 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
|
|||||||
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
|
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
|
||||||
var req models.CompleteMultipartUploadRequest
|
var req models.CompleteMultipartUploadRequest
|
||||||
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
if errors.Is(err, auth.ErrSignatureDoesNotMatch) {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
var maxErr *http.MaxBytesError
|
var maxErr *http.MaxBytesError
|
||||||
if errors.As(err, &maxErr) {
|
if errors.As(err, &maxErr) {
|
||||||
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
|
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
|
||||||
@@ -379,6 +388,10 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
bodyReader := io.Reader(r.Body)
|
bodyReader := io.Reader(r.Body)
|
||||||
var decodeStream io.ReadCloser
|
var decodeStream io.ReadCloser
|
||||||
|
if hasUnsupportedAWSChunkedPayload(r) {
|
||||||
|
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
if shouldDecodeAWSChunkedPayload(r) {
|
if shouldDecodeAWSChunkedPayload(r) {
|
||||||
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
||||||
defer decodeStream.Close()
|
defer decodeStream.Close()
|
||||||
@@ -453,6 +466,10 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
bodyReader := io.Reader(r.Body)
|
bodyReader := io.Reader(r.Body)
|
||||||
var decodeStream io.ReadCloser
|
var decodeStream io.ReadCloser
|
||||||
|
if hasUnsupportedAWSChunkedPayload(r) {
|
||||||
|
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
if shouldDecodeAWSChunkedPayload(r) {
|
if shouldDecodeAWSChunkedPayload(r) {
|
||||||
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
decodeStream = newAWSChunkedDecodingReader(r.Body)
|
||||||
defer decodeStream.Close()
|
defer decodeStream.Close()
|
||||||
@@ -508,17 +525,18 @@ func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Reques
|
|||||||
}
|
}
|
||||||
|
|
||||||
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
|
func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
|
||||||
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
|
|
||||||
if strings.Contains(contentEncoding, "aws-chunked") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
|
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
|
||||||
if strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return strings.HasPrefix(signingMode, "streaming-unsigned-payload")
|
return strings.HasPrefix(signingMode, "streaming-unsigned-payload")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hasUnsupportedAWSChunkedPayload(r *http.Request) bool {
|
||||||
|
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
|
||||||
|
if !strings.Contains(contentEncoding, "aws-chunked") {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !shouldDecodeAWSChunkedPayload(r)
|
||||||
|
}
|
||||||
|
|
||||||
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
|
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
|
||||||
probedReader, isAWSChunked := probeAWSChunkedPayload(src)
|
probedReader, isAWSChunked := probeAWSChunkedPayload(src)
|
||||||
if !isAWSChunked {
|
if !isAWSChunked {
|
||||||
@@ -537,9 +555,12 @@ func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func probeAWSChunkedPayload(src io.Reader) (io.Reader, bool) {
|
func probeAWSChunkedPayload(src io.Reader) (io.Reader, bool) {
|
||||||
reader := bufio.NewReaderSize(src, 512)
|
reader := bufio.NewReaderSize(src, maxAWSChunkedLineBytes)
|
||||||
headerLine, err := reader.ReadSlice('\n')
|
headerLine, err := reader.ReadSlice('\n')
|
||||||
replay := io.MultiReader(bytes.NewReader(headerLine), reader)
|
replay := io.MultiReader(bytes.NewReader(headerLine), reader)
|
||||||
|
if errors.Is(err, bufio.ErrBufferFull) {
|
||||||
|
return replay, true
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return replay, false
|
return replay, false
|
||||||
}
|
}
|
||||||
@@ -561,9 +582,9 @@ func probeAWSChunkedPayload(src io.Reader) (io.Reader, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
|
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
|
||||||
reader := bufio.NewReader(src)
|
reader := bufio.NewReaderSize(src, maxAWSChunkedLineBytes)
|
||||||
for {
|
for {
|
||||||
headerLine, err := reader.ReadString('\n')
|
headerLine, err := readAWSChunkedLine(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -580,6 +601,17 @@ func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
|
|||||||
if chunkSize < 0 {
|
if chunkSize < 0 {
|
||||||
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
|
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
|
||||||
}
|
}
|
||||||
|
if chunkSize == 0 {
|
||||||
|
for {
|
||||||
|
line, err := readAWSChunkedLine(reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if line == "\r\n" || line == "\n" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if chunkSize > 0 {
|
if chunkSize > 0 {
|
||||||
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
|
if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -593,19 +625,18 @@ func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
|
|||||||
if crlf[0] != '\r' || crlf[1] != '\n' {
|
if crlf[0] != '\r' || crlf[1] != '\n' {
|
||||||
return errors.New("invalid aws-chunked payload terminator")
|
return errors.New("invalid aws-chunked payload terminator")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if chunkSize == 0 {
|
func readAWSChunkedLine(reader *bufio.Reader) (string, error) {
|
||||||
for {
|
line, err := reader.ReadSlice('\n')
|
||||||
line, err := reader.ReadString('\n')
|
if errors.Is(err, bufio.ErrBufferFull) {
|
||||||
if err != nil {
|
return "", service.ErrEntityTooLarge
|
||||||
return err
|
|
||||||
}
|
|
||||||
if line == "\r\n" || line == "\n" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if len(line) > maxAWSChunkedLineBytes {
|
||||||
|
return "", service.ErrEntityTooLarge
|
||||||
}
|
}
|
||||||
|
return string(line), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func ifNoneMatchPreconditionFailed(headerValue, etag string) bool {
|
func ifNoneMatchPreconditionFailed(headerValue, etag string) bool {
|
||||||
@@ -664,6 +695,10 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
var req models.DeleteObjectsRequest
|
var req models.DeleteObjectsRequest
|
||||||
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
|
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
|
||||||
|
if errors.Is(err, auth.ErrSignatureDoesNotMatch) {
|
||||||
|
writeMappedS3Error(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
var maxErr *http.MaxBytesError
|
var maxErr *http.MaxBytesError
|
||||||
if errors.As(err, &maxErr) {
|
if errors.As(err, &maxErr) {
|
||||||
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
|
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
|
||||||
@@ -699,6 +734,15 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
|||||||
})
|
})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if err := h.authorizeObjectAction(r, auth.ActionDeleteObject, bucket, obj.Key); err != nil {
|
||||||
|
apiErr := mapToS3Error(err)
|
||||||
|
response.Errors = append(response.Errors, models.DeleteError{
|
||||||
|
Key: obj.Key,
|
||||||
|
Code: apiErr.Code,
|
||||||
|
Message: apiErr.Message,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
keys = append(keys, obj.Key)
|
keys = append(keys, obj.Key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
115
api/aws_chunked_test.go
Normal file
115
api/aws_chunked_test.go
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"fs/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestShouldDecodeAWSChunkedPayloadUnsignedTrailerMode(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req.Header.Set("x-amz-content-sha256", "STREAMING-UNSIGNED-PAYLOAD-TRAILER")
|
||||||
|
if !shouldDecodeAWSChunkedPayload(req) {
|
||||||
|
t.Fatalf("expected shouldDecodeAWSChunkedPayload to return true for STREAMING-UNSIGNED-PAYLOAD-TRAILER")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsupportedAWSChunkedContentEncodingWithoutStreamingMode(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Encoding", "aws-chunked")
|
||||||
|
req.Header.Set("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
|
||||||
|
|
||||||
|
if !hasUnsupportedAWSChunkedPayload(req) {
|
||||||
|
t.Fatalf("expected aws-chunked content encoding without streaming mode to be unsupported")
|
||||||
|
}
|
||||||
|
if shouldDecodeAWSChunkedPayload(req) {
|
||||||
|
t.Fatalf("non-streaming aws-chunked content encoding must not trigger decoding")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutObjectRejectsUnsignedAWSChunkedContentEncoding(t *testing.T) {
|
||||||
|
handler, svc := newUploadLimitHandler(t, 1024)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPut, "/test-bucket/object.txt", strings.NewReader("4\r\nWiki\r\n0\r\n\r\n"))
|
||||||
|
req.Header.Set("Content-Encoding", "aws-chunked")
|
||||||
|
req.Header.Set("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.router.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusBadRequest {
|
||||||
|
t.Fatalf("status = %d, want %d body=%s", rec.Code, http.StatusBadRequest, rec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "InvalidArgument") {
|
||||||
|
t.Fatalf("expected InvalidArgument response, body=%s", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAWSChunkedReaderPassThroughForPlainPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
plain := "PAR1\x00\x01\x02\x03binary-without-aws-chunked-header"
|
||||||
|
reader := newAWSChunkedDecodingReader(strings.NewReader(plain))
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
out, err := io.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read failed: %v", err)
|
||||||
|
}
|
||||||
|
if string(out) != plain {
|
||||||
|
t.Fatalf("unexpected passthrough result: got %q want %q", string(out), plain)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAWSChunkedReaderDecodesChunkedPayload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
encoded := "" +
|
||||||
|
"4\r\nWiki\r\n" +
|
||||||
|
"5\r\npedia\r\n" +
|
||||||
|
"0\r\n" +
|
||||||
|
"x-amz-checksum-crc32:xxxx\r\n" +
|
||||||
|
"\r\n"
|
||||||
|
|
||||||
|
reader := newAWSChunkedDecodingReader(strings.NewReader(encoded))
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
out, err := io.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read failed: %v", err)
|
||||||
|
}
|
||||||
|
if string(out) != "Wikipedia" {
|
||||||
|
t.Fatalf("decoded payload mismatch: got %q want %q", string(out), "Wikipedia")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAWSChunkedReaderRejectsOversizedChunkHeader(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
encoded := strings.Repeat("f", maxAWSChunkedLineBytes+1) + "\n"
|
||||||
|
reader := newAWSChunkedDecodingReader(strings.NewReader(encoded))
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
_, err := io.ReadAll(reader)
|
||||||
|
if !errors.Is(err, service.ErrEntityTooLarge) {
|
||||||
|
t.Fatalf("read error = %v, want ErrEntityTooLarge", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
283
api/multi_delete_auth_test.go
Normal file
283
api/multi_delete_auth_test.go
Normal file
@@ -0,0 +1,283 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/hmac"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"fs/auth"
|
||||||
|
"fs/logging"
|
||||||
|
"fs/metadata"
|
||||||
|
"fs/models"
|
||||||
|
"fs/service"
|
||||||
|
"fs/storage"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newAuthorizedDeleteHandler(t *testing.T) (*Handler, *service.ObjectService, *auth.Service) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
root := t.TempDir()
|
||||||
|
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new metadata handler: %v", err)
|
||||||
|
}
|
||||||
|
blob, err := storage.NewBlobStore(root, 1024)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new blob store: %v", err)
|
||||||
|
}
|
||||||
|
svc := service.NewObjectService(md, blob, time.Hour)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = svc.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
masterKey := base64.StdEncoding.EncodeToString(make([]byte, 32))
|
||||||
|
authSvc, err := auth.NewService(auth.ConfigFromValues(
|
||||||
|
true,
|
||||||
|
"us-east-1",
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
masterKey,
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
), md)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new auth service: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||||
|
handler := NewHandler(svc, logger, logging.Config{}, authSvc, false)
|
||||||
|
return handler, svc, authSvc
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBucketPostRequest(bucket, body string) *http.Request {
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/"+bucket+"?delete", strings.NewReader(body))
|
||||||
|
rctx := chi.NewRouteContext()
|
||||||
|
rctx.URLParams.Add("bucket", bucket)
|
||||||
|
return req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
func withAuthContext(req *http.Request, accessKeyID string) *http.Request {
|
||||||
|
authCtx := auth.RequestContext{
|
||||||
|
Authenticated: true,
|
||||||
|
AccessKeyID: accessKeyID,
|
||||||
|
AuthType: "test",
|
||||||
|
}
|
||||||
|
return req.WithContext(auth.WithRequestContext(req.Context(), authCtx))
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDeleteUser(t *testing.T, authSvc *auth.Service, prefix string) {
|
||||||
|
t.Helper()
|
||||||
|
createDeleteUserWithStatements(t, authSvc, []models.AuthPolicyStatement{
|
||||||
|
{
|
||||||
|
Effect: "allow",
|
||||||
|
Actions: []string{"s3:DeleteObject"},
|
||||||
|
Bucket: "test-bucket",
|
||||||
|
Prefix: prefix,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDeleteUserWithStatements(t *testing.T, authSvc *auth.Service, statements []models.AuthPolicyStatement) {
|
||||||
|
t.Helper()
|
||||||
|
_, err := authSvc.CreateUser(auth.CreateUserInput{
|
||||||
|
AccessKeyID: "delete-user",
|
||||||
|
SecretKey: "delete-secret-1",
|
||||||
|
Policy: models.AuthPolicy{
|
||||||
|
Statements: statements,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create delete user: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func putTestObject(t *testing.T, svc *service.ObjectService, key string) {
|
||||||
|
t.Helper()
|
||||||
|
_, err := svc.PutObject("test-bucket", key, "text/plain", bytes.NewReader([]byte("data")))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("put object %q: %v", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiDeleteAuthorizesEveryKey(t *testing.T) {
|
||||||
|
handler, svc, authSvc := newAuthorizedDeleteHandler(t)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("create bucket: %v", err)
|
||||||
|
}
|
||||||
|
createDeleteUser(t, authSvc, "allowed/")
|
||||||
|
putTestObject(t, svc, "allowed/file.txt")
|
||||||
|
putTestObject(t, svc, "private/file.txt")
|
||||||
|
|
||||||
|
body := `<Delete><Object><Key>allowed/file.txt</Key></Object><Object><Key>private/file.txt</Key></Object></Delete>`
|
||||||
|
req := withAuthContext(newBucketPostRequest("test-bucket", body), "delete-user")
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.handlePostBucket(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
responseBody := rec.Body.String()
|
||||||
|
if !strings.Contains(responseBody, "<Deleted>") || !strings.Contains(responseBody, "allowed/file.txt") {
|
||||||
|
t.Fatalf("expected allowed key to be deleted, body=%s", responseBody)
|
||||||
|
}
|
||||||
|
if !strings.Contains(responseBody, "<Error>") || !strings.Contains(responseBody, "private/file.txt") || !strings.Contains(responseBody, "AccessDenied") {
|
||||||
|
t.Fatalf("expected denied key error, body=%s", responseBody)
|
||||||
|
}
|
||||||
|
if _, err := svc.HeadObject("test-bucket", "allowed/file.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
|
||||||
|
t.Fatalf("allowed object should be deleted, got err=%v", err)
|
||||||
|
}
|
||||||
|
if _, err := svc.HeadObject("test-bucket", "private/file.txt"); err != nil {
|
||||||
|
t.Fatalf("private object should remain: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiDeleteAllowsScopedKeys(t *testing.T) {
|
||||||
|
handler, svc, authSvc := newAuthorizedDeleteHandler(t)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("create bucket: %v", err)
|
||||||
|
}
|
||||||
|
createDeleteUser(t, authSvc, "allowed/")
|
||||||
|
putTestObject(t, svc, "allowed/file.txt")
|
||||||
|
|
||||||
|
body := `<Delete><Object><Key>allowed/file.txt</Key></Object></Delete>`
|
||||||
|
req := withAuthContext(newBucketPostRequest("test-bucket", body), "delete-user")
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.handlePostBucket(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
if strings.Contains(rec.Body.String(), "<Error>") {
|
||||||
|
t.Fatalf("unexpected delete error body=%s", rec.Body.String())
|
||||||
|
}
|
||||||
|
if _, err := svc.HeadObject("test-bucket", "allowed/file.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
|
||||||
|
t.Fatalf("allowed object should be deleted, got err=%v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiDeleteRouteAuthorizesKeysAfterMiddleware(t *testing.T) {
|
||||||
|
handler, svc, authSvc := newAuthorizedDeleteHandler(t)
|
||||||
|
handler.setupRoutes()
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("create bucket: %v", err)
|
||||||
|
}
|
||||||
|
createDeleteUserWithStatements(t, authSvc, []models.AuthPolicyStatement{
|
||||||
|
{Effect: "allow", Actions: []string{"s3:DeleteObject"}, Bucket: "test-bucket", Prefix: "allowed/"},
|
||||||
|
{Effect: "deny", Actions: []string{"s3:DeleteObject"}, Bucket: "test-bucket", Prefix: "private/"},
|
||||||
|
})
|
||||||
|
putTestObject(t, svc, "allowed/file.txt")
|
||||||
|
putTestObject(t, svc, "private/file.txt")
|
||||||
|
|
||||||
|
body := `<Delete><Object><Key>allowed/file.txt</Key></Object><Object><Key>private/file.txt</Key></Object></Delete>`
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/test-bucket?delete", strings.NewReader(body))
|
||||||
|
signTestSigV4Request(t, req, "delete-user", "delete-secret-1")
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.router.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
responseBody := rec.Body.String()
|
||||||
|
if !strings.Contains(responseBody, "allowed/file.txt") || !strings.Contains(responseBody, "<Deleted>") {
|
||||||
|
t.Fatalf("expected allowed key deletion, body=%s", responseBody)
|
||||||
|
}
|
||||||
|
if !strings.Contains(responseBody, "private/file.txt") || !strings.Contains(responseBody, "AccessDenied") {
|
||||||
|
t.Fatalf("expected per-key AccessDenied, body=%s", responseBody)
|
||||||
|
}
|
||||||
|
if _, err := svc.HeadObject("test-bucket", "allowed/file.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
|
||||||
|
t.Fatalf("allowed object should be deleted, got err=%v", err)
|
||||||
|
}
|
||||||
|
if _, err := svc.HeadObject("test-bucket", "private/file.txt"); err != nil {
|
||||||
|
t.Fatalf("private object should remain: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func signTestSigV4Request(t *testing.T, req *http.Request, accessKeyID, secretKey string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
amzDate := time.Now().UTC().Format("20060102T150405Z")
|
||||||
|
date := amzDate[:8]
|
||||||
|
region := "us-east-1"
|
||||||
|
serviceName := "s3"
|
||||||
|
scope := strings.Join([]string{date, region, serviceName, "aws4_request"}, "/")
|
||||||
|
signedHeaders := []string{"host", "x-amz-content-sha256", "x-amz-date"}
|
||||||
|
signedHeadersRaw := strings.Join(signedHeaders, ";")
|
||||||
|
payloadHash := "UNSIGNED-PAYLOAD"
|
||||||
|
|
||||||
|
req.Header.Set("x-amz-date", amzDate)
|
||||||
|
req.Header.Set("x-amz-content-sha256", payloadHash)
|
||||||
|
canonicalRequest := strings.Join([]string{
|
||||||
|
req.Method,
|
||||||
|
req.URL.EscapedPath(),
|
||||||
|
canonicalTestQuery(req.URL.RawQuery),
|
||||||
|
"host:" + strings.TrimSpace(req.Host) + "\n" +
|
||||||
|
"x-amz-content-sha256:" + payloadHash + "\n" +
|
||||||
|
"x-amz-date:" + amzDate + "\n",
|
||||||
|
signedHeadersRaw,
|
||||||
|
payloadHash,
|
||||||
|
}, "\n")
|
||||||
|
canonicalHash := sha256.Sum256([]byte(canonicalRequest))
|
||||||
|
stringToSign := strings.Join([]string{
|
||||||
|
"AWS4-HMAC-SHA256",
|
||||||
|
amzDate,
|
||||||
|
scope,
|
||||||
|
hex.EncodeToString(canonicalHash[:]),
|
||||||
|
}, "\n")
|
||||||
|
signingKey := testHMAC(testHMAC(testHMAC(testHMAC([]byte("AWS4"+secretKey), date), region), serviceName), "aws4_request")
|
||||||
|
signature := hex.EncodeToString(testHMAC(signingKey, stringToSign))
|
||||||
|
|
||||||
|
req.Header.Set("Authorization", "AWS4-HMAC-SHA256 "+
|
||||||
|
"Credential="+accessKeyID+"/"+scope+", "+
|
||||||
|
"SignedHeaders="+signedHeadersRaw+", "+
|
||||||
|
"Signature="+signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func canonicalTestQuery(rawQuery string) string {
|
||||||
|
values, _ := url.ParseQuery(rawQuery)
|
||||||
|
pairs := make([]string, 0)
|
||||||
|
for key, valueList := range values {
|
||||||
|
if len(valueList) == 0 {
|
||||||
|
pairs = append(pairs, awsTestQueryEscape(key)+"=")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, value := range valueList {
|
||||||
|
pairs = append(pairs, awsTestQueryEscape(key)+"="+awsTestQueryEscape(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Strings(pairs)
|
||||||
|
return strings.Join(pairs, "&")
|
||||||
|
}
|
||||||
|
|
||||||
|
func awsTestQueryEscape(value string) string {
|
||||||
|
encoded := url.QueryEscape(value)
|
||||||
|
encoded = strings.ReplaceAll(encoded, "+", "%20")
|
||||||
|
encoded = strings.ReplaceAll(encoded, "*", "%2A")
|
||||||
|
encoded = strings.ReplaceAll(encoded, "%7E", "~")
|
||||||
|
return encoded
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHMAC(key []byte, value string) []byte {
|
||||||
|
mac := hmac.New(sha256.New, key)
|
||||||
|
_, _ = mac.Write([]byte(value))
|
||||||
|
return mac.Sum(nil)
|
||||||
|
}
|
||||||
107
api/object_copy_test.go
Normal file
107
api/object_copy_test.go
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"fs/logging"
|
||||||
|
"fs/metadata"
|
||||||
|
"fs/service"
|
||||||
|
"fs/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestObjectHandler(t *testing.T) (*Handler, *service.ObjectService) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
root := t.TempDir()
|
||||||
|
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new metadata handler: %v", err)
|
||||||
|
}
|
||||||
|
blob, err := storage.NewBlobStore(root, 1024)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new blob store: %v", err)
|
||||||
|
}
|
||||||
|
svc := service.NewObjectService(md, blob, time.Hour)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = svc.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||||
|
handler := NewHandler(svc, logger, logging.Config{}, nil, false)
|
||||||
|
handler.setupRoutes()
|
||||||
|
return handler, svc
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutObjectStoresDecodedKey(t *testing.T) {
|
||||||
|
handler, svc := newTestObjectHandler(t)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("create bucket: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPut, "/test-bucket/jsp-data-raw/vehicle_positions/year%3D2026/month%3D03/day%3D12/file.parquet", bytes.NewReader([]byte("PAR1data")))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := svc.HeadObject("test-bucket", "jsp-data-raw/vehicle_positions/year=2026/month=03/day=12/file.parquet")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("head decoded key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
getReq := httptest.NewRequest(http.MethodGet, "/test-bucket/jsp-data-raw/vehicle_positions/year=2026/month=03/day=12/file.parquet", nil)
|
||||||
|
getRec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(getRec, getReq)
|
||||||
|
if getRec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected get status: got %d body=%s", getRec.Code, getRec.Body.String())
|
||||||
|
}
|
||||||
|
if got := getRec.Body.String(); got != "PAR1data" {
|
||||||
|
t.Fatalf("unexpected get body: got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCopyObjectCopiesCanonicalObject(t *testing.T) {
|
||||||
|
handler, svc := newTestObjectHandler(t)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("create bucket: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
putReq := httptest.NewRequest(http.MethodPut, "/test-bucket/source/year%3D2026/file.parquet", bytes.NewReader([]byte("PAR1copy")))
|
||||||
|
putRec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(putRec, putReq)
|
||||||
|
if putRec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected put status: got %d body=%s", putRec.Code, putRec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
copyReq := httptest.NewRequest(http.MethodPut, "/test-bucket/copied/year=2026/file.parquet", http.NoBody)
|
||||||
|
copyReq.Header.Set("x-amz-copy-source", "/test-bucket/source/year%3D2026/file.parquet")
|
||||||
|
copyRec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(copyRec, copyReq)
|
||||||
|
|
||||||
|
if copyRec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected copy status: got %d body=%s", copyRec.Code, copyRec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(copyRec.Body.String(), "<CopyObjectResult") {
|
||||||
|
t.Fatalf("unexpected copy response body: %s", copyRec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
getReq := httptest.NewRequest(http.MethodGet, "/test-bucket/copied/year=2026/file.parquet", nil)
|
||||||
|
getRec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(getRec, getReq)
|
||||||
|
if getRec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected get status after copy: got %d body=%s", getRec.Code, getRec.Body.String())
|
||||||
|
}
|
||||||
|
if got := getRec.Body.String(); got != "PAR1copy" {
|
||||||
|
t.Fatalf("unexpected copied body: got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -174,6 +174,8 @@ func mapToS3Error(err error) s3APIError {
|
|||||||
return s3ErrMalformedXML
|
return s3ErrMalformedXML
|
||||||
case errors.Is(err, service.ErrEntityTooSmall):
|
case errors.Is(err, service.ErrEntityTooSmall):
|
||||||
return s3ErrEntityTooSmall
|
return s3ErrEntityTooSmall
|
||||||
|
case errors.Is(err, service.ErrEntityTooLarge):
|
||||||
|
return s3ErrEntityTooLarge
|
||||||
case errors.Is(err, auth.ErrAccessDenied):
|
case errors.Is(err, auth.ErrAccessDenied):
|
||||||
return s3ErrAccessDenied
|
return s3ErrAccessDenied
|
||||||
case errors.Is(err, auth.ErrInvalidAccessKeyID):
|
case errors.Is(err, auth.ErrInvalidAccessKeyID):
|
||||||
|
|||||||
79
api/upload_limit_test.go
Normal file
79
api/upload_limit_test.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"fs/logging"
|
||||||
|
"fs/metadata"
|
||||||
|
"fs/service"
|
||||||
|
"fs/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPutObjectReturnsEntityTooLarge(t *testing.T) {
|
||||||
|
handler, svc := newUploadLimitHandler(t, 4)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPut, "/test-bucket/too-large.txt", strings.NewReader("12345"))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||||
|
t.Fatalf("status = %d, want %d body=%s", rec.Code, http.StatusRequestEntityTooLarge, rec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "EntityTooLarge") {
|
||||||
|
t.Fatalf("expected EntityTooLarge response, body=%s", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUploadPartReturnsEntityTooLarge(t *testing.T) {
|
||||||
|
handler, svc := newUploadLimitHandler(t, 4)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPut, "/test-bucket/object.txt?partNumber=1&uploadId="+upload.UploadID, bytes.NewReader([]byte("12345")))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.router.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusRequestEntityTooLarge {
|
||||||
|
t.Fatalf("status = %d, want %d body=%s", rec.Code, http.StatusRequestEntityTooLarge, rec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "EntityTooLarge") {
|
||||||
|
t.Fatalf("expected EntityTooLarge response, body=%s", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newUploadLimitHandler(t *testing.T, maxUploadSize int64) (*Handler, *service.ObjectService) {
|
||||||
|
t.Helper()
|
||||||
|
root := t.TempDir()
|
||||||
|
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new metadata handler: %v", err)
|
||||||
|
}
|
||||||
|
blob, err := storage.NewBlobStore(root, 4)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new blob store: %v", err)
|
||||||
|
}
|
||||||
|
svc := service.NewObjectService(md, blob, time.Hour, maxUploadSize)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = svc.Close()
|
||||||
|
})
|
||||||
|
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||||
|
handler := NewHandler(svc, logger, logging.Config{}, nil, false)
|
||||||
|
handler.setupRoutes()
|
||||||
|
return handler, svc
|
||||||
|
}
|
||||||
@@ -39,6 +39,7 @@ func RunServer(ctx context.Context) error {
|
|||||||
"audit_log", logConfig.Audit,
|
"audit_log", logConfig.Audit,
|
||||||
"data_path", config.DataPath,
|
"data_path", config.DataPath,
|
||||||
"multipart_retention_hours", int(config.MultipartCleanupRetention/time.Hour),
|
"multipart_retention_hours", int(config.MultipartCleanupRetention/time.Hour),
|
||||||
|
"max_object_upload_bytes", config.MaxObjectUploadBytes,
|
||||||
"auth_enabled", authConfig.Enabled,
|
"auth_enabled", authConfig.Enabled,
|
||||||
"auth_region", authConfig.Region,
|
"auth_region", authConfig.Region,
|
||||||
"admin_api_enabled", config.AdminAPIEnabled,
|
"admin_api_enabled", config.AdminAPIEnabled,
|
||||||
@@ -63,7 +64,7 @@ func RunServer(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
objectService := service.NewObjectService(metadataHandler, blobHandler, config.MultipartCleanupRetention)
|
objectService := service.NewObjectService(metadataHandler, blobHandler, config.MultipartCleanupRetention, config.MaxObjectUploadBytes)
|
||||||
authService, err := auth.NewService(authConfig, metadataHandler)
|
authService, err := auth.NewService(authConfig, metadataHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = metadataHandler.Close()
|
_ = metadataHandler.Close()
|
||||||
|
|||||||
@@ -94,9 +94,11 @@ For each non-health request:
|
|||||||
6. Decrypt stored secret using master key.
|
6. Decrypt stored secret using master key.
|
||||||
7. Recompute canonical request and expected signature.
|
7. Recompute canonical request and expected signature.
|
||||||
8. Compare signatures.
|
8. Compare signatures.
|
||||||
9. Resolve target action from request.
|
9. Reject signed streaming payload modes that require per-chunk signature verification.
|
||||||
10. Evaluate policy; deny overrides allow.
|
10. Wrap fixed-size signed payloads so the actual body must match `x-amz-content-sha256`.
|
||||||
11. Store auth result in request context and continue.
|
11. Resolve target action from request.
|
||||||
|
12. Evaluate policy; deny overrides allow.
|
||||||
|
13. Store auth result in request context and continue.
|
||||||
|
|
||||||
## Authorization Semantics
|
## Authorization Semantics
|
||||||
Policy evaluator rules:
|
Policy evaluator rules:
|
||||||
@@ -106,6 +108,9 @@ Policy evaluator rules:
|
|||||||
- action: `*` or `s3:*`
|
- action: `*` or `s3:*`
|
||||||
- bucket: `*`
|
- bucket: `*`
|
||||||
- prefix: `*`
|
- prefix: `*`
|
||||||
|
- Object actions apply `prefix` to the object key.
|
||||||
|
- `ListBucket` applies `prefix` to the requested list `prefix` query value; a scoped list policy such as `prefix=backups/` does not allow an empty-prefix or sibling-prefix bucket listing.
|
||||||
|
- Multi-object delete is authorized per object key after the XML body is parsed; denied keys are returned as per-key `AccessDenied` errors and are not deleted.
|
||||||
|
|
||||||
Action resolution includes:
|
Action resolution includes:
|
||||||
- bucket APIs (`CreateBucket`, `ListBucket`, `HeadBucket`, `DeleteBucket`)
|
- bucket APIs (`CreateBucket`, `ListBucket`, `HeadBucket`, `DeleteBucket`)
|
||||||
@@ -137,6 +142,7 @@ Each audit entry includes method, path, remote IP, and request ID (if present).
|
|||||||
|
|
||||||
## Current Scope / Limitations
|
## Current Scope / Limitations
|
||||||
- No STS/session-token auth yet.
|
- No STS/session-token auth yet.
|
||||||
|
- Signed aws-chunked streaming payloads are not accepted until per-chunk signature verification is implemented. Unsigned streaming payload modes can still be decoded by the API layer.
|
||||||
- Policy language is intentionally minimal, not full IAM.
|
- Policy language is intentionally minimal, not full IAM.
|
||||||
- No automatic key rotation workflows.
|
- No automatic key rotation workflows.
|
||||||
- No key rotation endpoint for existing users yet.
|
- No key rotation endpoint for existing users yet.
|
||||||
|
|||||||
@@ -27,6 +27,18 @@ type RequestTarget struct {
|
|||||||
Action Action
|
Action Action
|
||||||
Bucket string
|
Bucket string
|
||||||
Key string
|
Key string
|
||||||
|
Prefix string
|
||||||
|
}
|
||||||
|
|
||||||
|
func RequiresHandlerAuthorization(r *http.Request) bool {
|
||||||
|
if r == nil || r.URL == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if r.Method == http.MethodPost {
|
||||||
|
_, isDelete := r.URL.Query()["delete"]
|
||||||
|
return isDelete
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveTarget(r *http.Request) RequestTarget {
|
func resolveTarget(r *http.Request) RequestTarget {
|
||||||
@@ -51,7 +63,7 @@ func resolveTarget(r *http.Request) RequestTarget {
|
|||||||
case http.MethodDelete:
|
case http.MethodDelete:
|
||||||
return RequestTarget{Action: ActionDeleteBucket, Bucket: bucket}
|
return RequestTarget{Action: ActionDeleteBucket, Bucket: bucket}
|
||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
return RequestTarget{Action: ActionListBucket, Bucket: bucket}
|
return RequestTarget{Action: ActionListBucket, Bucket: bucket, Prefix: r.URL.Query().Get("prefix")}
|
||||||
case http.MethodPost:
|
case http.MethodPost:
|
||||||
if _, ok := r.URL.Query()["delete"]; ok {
|
if _, ok := r.URL.Query()["delete"]; ok {
|
||||||
return RequestTarget{Action: ActionDeleteObject, Bucket: bucket}
|
return RequestTarget{Action: ActionDeleteObject, Bucket: bucket}
|
||||||
|
|||||||
39
auth/action_test.go
Normal file
39
auth/action_test.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestResolveTargetIncludesListBucketPrefix(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://example.com/test-bucket?list-type=2&prefix=allowed/", nil)
|
||||||
|
|
||||||
|
target := resolveTarget(req)
|
||||||
|
|
||||||
|
if target.Action != ActionListBucket {
|
||||||
|
t.Fatalf("action = %q, want %q", target.Action, ActionListBucket)
|
||||||
|
}
|
||||||
|
if target.Bucket != "test-bucket" {
|
||||||
|
t.Fatalf("bucket = %q, want test-bucket", target.Bucket)
|
||||||
|
}
|
||||||
|
if target.Prefix != "allowed/" {
|
||||||
|
t.Fatalf("prefix = %q, want allowed/", target.Prefix)
|
||||||
|
}
|
||||||
|
if target.Key != "" {
|
||||||
|
t.Fatalf("key = %q, want empty", target.Key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveTargetListBucketWithoutPrefix(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://example.com/test-bucket", nil)
|
||||||
|
|
||||||
|
target := resolveTarget(req)
|
||||||
|
|
||||||
|
if target.Action != ActionListBucket {
|
||||||
|
t.Fatalf("action = %q, want %q", target.Action, ActionListBucket)
|
||||||
|
}
|
||||||
|
if target.Prefix != "" {
|
||||||
|
t.Fatalf("prefix = %q, want empty", target.Prefix)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,11 +1,16 @@
|
|||||||
package auth
|
package auth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fs/metrics"
|
"fs/metrics"
|
||||||
|
"hash"
|
||||||
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
)
|
)
|
||||||
@@ -55,6 +60,16 @@ func Middleware(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := wrapPayloadHashVerifier(r); err != nil {
|
||||||
|
metrics.Default.ObserveAuth("error", "sigv4", authErrorClass(err))
|
||||||
|
if onError != nil {
|
||||||
|
onError(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none")
|
metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none")
|
||||||
if auditEnabled && logger != nil {
|
if auditEnabled && logger != nil {
|
||||||
requestID := middleware.GetReqID(r.Context())
|
requestID := middleware.GetReqID(r.Context())
|
||||||
@@ -75,6 +90,65 @@ func Middleware(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func wrapPayloadHashVerifier(r *http.Request) error {
|
||||||
|
if r == nil || r.Body == nil || r.Body == http.NoBody {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
payloadHash := resolvePayloadHash(r, false)
|
||||||
|
if !payloadHashRequiresVerification(payloadHash) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !isHexSHA256(payloadHash) {
|
||||||
|
return ErrAuthorizationHeaderMalformed
|
||||||
|
}
|
||||||
|
expected, err := hex.DecodeString(strings.ToLower(payloadHash))
|
||||||
|
if err != nil {
|
||||||
|
return ErrAuthorizationHeaderMalformed
|
||||||
|
}
|
||||||
|
r.Body = &payloadHashVerifyingReadCloser{
|
||||||
|
inner: r.Body,
|
||||||
|
hasher: sha256.New(),
|
||||||
|
expected: expected,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type payloadHashVerifyingReadCloser struct {
|
||||||
|
inner io.ReadCloser
|
||||||
|
hasher hash.Hash
|
||||||
|
expected []byte
|
||||||
|
done bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *payloadHashVerifyingReadCloser) Read(p []byte) (int, error) {
|
||||||
|
n, err := r.inner.Read(p)
|
||||||
|
if n > 0 {
|
||||||
|
_, _ = r.hasher.Write(p[:n])
|
||||||
|
}
|
||||||
|
if err == io.EOF && !r.done {
|
||||||
|
r.done = true
|
||||||
|
if !equalBytes(r.hasher.Sum(nil), r.expected) {
|
||||||
|
return n, ErrSignatureDoesNotMatch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *payloadHashVerifyingReadCloser) Close() error {
|
||||||
|
return r.inner.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func equalBytes(left, right []byte) bool {
|
||||||
|
if len(left) != len(right) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var diff byte
|
||||||
|
for i := range left {
|
||||||
|
diff |= left[i] ^ right[i]
|
||||||
|
}
|
||||||
|
return diff == 0
|
||||||
|
}
|
||||||
|
|
||||||
func authErrorClass(err error) string {
|
func authErrorClass(err error) string {
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, ErrInvalidAccessKeyID):
|
case errors.Is(err, ErrInvalidAccessKeyID):
|
||||||
|
|||||||
75
auth/payload_hash_test.go
Normal file
75
auth/payload_hash_test.go
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPayloadHashVerifierAllowsMatchingBody(t *testing.T) {
|
||||||
|
body := "payload"
|
||||||
|
req := newPayloadHashRequest(t, body, body)
|
||||||
|
|
||||||
|
if err := wrapPayloadHashVerifier(req); err != nil {
|
||||||
|
t.Fatalf("wrapPayloadHashVerifier returned error: %v", err)
|
||||||
|
}
|
||||||
|
got, err := io.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ReadAll returned error: %v", err)
|
||||||
|
}
|
||||||
|
if string(got) != body {
|
||||||
|
t.Fatalf("unexpected body: got %q want %q", string(got), body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadHashVerifierRejectsMismatchedBody(t *testing.T) {
|
||||||
|
req := newPayloadHashRequest(t, "signed-payload", "actual-payload")
|
||||||
|
|
||||||
|
if err := wrapPayloadHashVerifier(req); err != nil {
|
||||||
|
t.Fatalf("wrapPayloadHashVerifier returned error: %v", err)
|
||||||
|
}
|
||||||
|
_, err := io.ReadAll(req.Body)
|
||||||
|
if !errors.Is(err, ErrSignatureDoesNotMatch) {
|
||||||
|
t.Fatalf("ReadAll error = %v, want ErrSignatureDoesNotMatch", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadSigningRejectsSignedStreamingMode(t *testing.T) {
|
||||||
|
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
|
||||||
|
|
||||||
|
err = validatePayloadSigningMode(req, &sigV4Input{})
|
||||||
|
if !errors.Is(err, ErrAuthorizationHeaderMalformed) {
|
||||||
|
t.Fatalf("validatePayloadSigningMode error = %v, want ErrAuthorizationHeaderMalformed", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadSigningAllowsUnsignedStreamingMode(t *testing.T) {
|
||||||
|
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req.Header.Set("x-amz-content-sha256", "STREAMING-UNSIGNED-PAYLOAD-TRAILER")
|
||||||
|
|
||||||
|
if err := validatePayloadSigningMode(req, &sigV4Input{}); err != nil {
|
||||||
|
t.Fatalf("validatePayloadSigningMode returned error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPayloadHashRequest(t *testing.T, signedBody, actualBody string) *http.Request {
|
||||||
|
t.Helper()
|
||||||
|
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", strings.NewReader(actualBody))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
sum := sha256.Sum256([]byte(signedBody))
|
||||||
|
req.Header.Set("x-amz-content-sha256", hex.EncodeToString(sum[:]))
|
||||||
|
return req
|
||||||
|
}
|
||||||
@@ -33,14 +33,16 @@ func statementMatches(stmt models.AuthPolicyStatement, target RequestTarget) boo
|
|||||||
if !bucketMatches(stmt.Bucket, target.Bucket) {
|
if !bucketMatches(stmt.Bucket, target.Bucket) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if target.Key == "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
prefix := strings.TrimSpace(stmt.Prefix)
|
prefix := strings.TrimSpace(stmt.Prefix)
|
||||||
if prefix == "" || prefix == "*" {
|
if prefix == "" || prefix == "*" {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
if target.Key == "" {
|
||||||
|
if target.Action == ActionListBucket {
|
||||||
|
return strings.HasPrefix(target.Prefix, prefix)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
return strings.HasPrefix(target.Key, prefix)
|
return strings.HasPrefix(target.Key, prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
52
auth/policy_test.go
Normal file
52
auth/policy_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fs/models"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestListBucketPolicyAppliesPrefix(t *testing.T) {
|
||||||
|
policy := &models.AuthPolicy{
|
||||||
|
Statements: []models.AuthPolicyStatement{
|
||||||
|
{
|
||||||
|
Effect: "allow",
|
||||||
|
Actions: []string{"s3:ListBucket"},
|
||||||
|
Bucket: "test-bucket",
|
||||||
|
Prefix: "allowed/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "allowed/"}) {
|
||||||
|
t.Fatalf("expected matching list prefix to be allowed")
|
||||||
|
}
|
||||||
|
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "allowed/nested/"}) {
|
||||||
|
t.Fatalf("expected nested list prefix to be allowed")
|
||||||
|
}
|
||||||
|
if isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket"}) {
|
||||||
|
t.Fatalf("expected empty list prefix to be denied")
|
||||||
|
}
|
||||||
|
if isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "private/"}) {
|
||||||
|
t.Fatalf("expected non-matching list prefix to be denied")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWildcardListBucketPolicyAllowsAnyPrefix(t *testing.T) {
|
||||||
|
policy := &models.AuthPolicy{
|
||||||
|
Statements: []models.AuthPolicyStatement{
|
||||||
|
{
|
||||||
|
Effect: "allow",
|
||||||
|
Actions: []string{"s3:ListBucket"},
|
||||||
|
Bucket: "test-bucket",
|
||||||
|
Prefix: "*",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket"}) {
|
||||||
|
t.Fatalf("expected wildcard list policy to allow empty prefix")
|
||||||
|
}
|
||||||
|
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "private/"}) {
|
||||||
|
t.Fatalf("expected wildcard list policy to allow arbitrary prefix")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -152,6 +152,9 @@ func (s *Service) AuthenticateRequest(r *http.Request) (RequestContext, error) {
|
|||||||
if err := validateSigV4Input(s.now(), s.cfg, input); err != nil {
|
if err := validateSigV4Input(s.now(), s.cfg, input); err != nil {
|
||||||
return RequestContext{}, err
|
return RequestContext{}, err
|
||||||
}
|
}
|
||||||
|
if err := validatePayloadSigningMode(r, input); err != nil {
|
||||||
|
return RequestContext{}, err
|
||||||
|
}
|
||||||
|
|
||||||
identity, err := s.store.GetAuthIdentity(input.AccessKeyID)
|
identity, err := s.store.GetAuthIdentity(input.AccessKeyID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -185,6 +188,13 @@ func (s *Service) AuthenticateRequest(r *http.Request) (RequestContext, error) {
|
|||||||
AuthType: authType,
|
AuthType: authType,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
if RequiresHandlerAuthorization(r) {
|
||||||
|
return RequestContext{
|
||||||
|
Authenticated: true,
|
||||||
|
AccessKeyID: identity.AccessKeyID,
|
||||||
|
AuthType: authType,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
policy, err := s.store.GetAuthPolicy(identity.AccessKeyID)
|
policy, err := s.store.GetAuthPolicy(identity.AccessKeyID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -210,6 +210,17 @@ func validateSigV4Input(now time.Time, cfg Config, input *sigV4Input) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validatePayloadSigningMode(r *http.Request, input *sigV4Input) error {
|
||||||
|
payloadHash := resolvePayloadHash(r, input.Presigned)
|
||||||
|
if isSignedStreamingPayloadHash(payloadHash) {
|
||||||
|
return fmt.Errorf("%w: signed streaming payload verification is not supported", ErrAuthorizationHeaderMalformed)
|
||||||
|
}
|
||||||
|
if payloadHashRequiresVerification(payloadHash) && !isHexSHA256(payloadHash) {
|
||||||
|
return fmt.Errorf("%w: invalid x-amz-content-sha256", ErrAuthorizationHeaderMalformed)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func signatureMatches(secret string, r *http.Request, input *sigV4Input) (bool, error) {
|
func signatureMatches(secret string, r *http.Request, input *sigV4Input) (bool, error) {
|
||||||
payloadHash := resolvePayloadHash(r, input.Presigned)
|
payloadHash := resolvePayloadHash(r, input.Presigned)
|
||||||
canonicalRequest, err := buildCanonicalRequest(r, input.SignedHeaders, payloadHash, input.Presigned)
|
canonicalRequest, err := buildCanonicalRequest(r, input.SignedHeaders, payloadHash, input.Presigned)
|
||||||
@@ -233,6 +244,34 @@ func resolvePayloadHash(r *http.Request, presigned bool) string {
|
|||||||
return hash
|
return hash
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isSignedStreamingPayloadHash(payloadHash string) bool {
|
||||||
|
payloadHash = strings.ToUpper(strings.TrimSpace(payloadHash))
|
||||||
|
return strings.HasPrefix(payloadHash, "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
func payloadHashRequiresVerification(payloadHash string) bool {
|
||||||
|
payloadHash = strings.ToUpper(strings.TrimSpace(payloadHash))
|
||||||
|
if payloadHash == "" || payloadHash == "UNSIGNED-PAYLOAD" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(payloadHash, "STREAMING-UNSIGNED-PAYLOAD") {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func isHexSHA256(value string) bool {
|
||||||
|
if len(value) != sha256.Size*2 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, ch := range value {
|
||||||
|
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') && (ch < 'A' || ch > 'F') {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func buildCanonicalRequest(r *http.Request, signedHeaders []string, payloadHash string, presigned bool) (string, error) {
|
func buildCanonicalRequest(r *http.Request, signedHeaders []string, payloadHash string, presigned bool) (string, error) {
|
||||||
canonicalURI := canonicalPath(r.URL)
|
canonicalURI := canonicalPath(r.URL)
|
||||||
canonicalQuery := canonicalQueryString(r.URL.RawQuery, presigned)
|
canonicalQuery := canonicalQueryString(r.URL.RawQuery, presigned)
|
||||||
|
|||||||
50
auth/sigv4_test.go
Normal file
50
auth/sigv4_test.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCanonicalPathEncodesEquals(t *testing.T) {
|
||||||
|
u := &url.URL{Path: "/test-bucket/jsp-data-raw/year=2026/month=03/day=12/vehicle_positions.parquet"}
|
||||||
|
got := canonicalPath(u)
|
||||||
|
want := "/test-bucket/jsp-data-raw/year%3D2026/month%3D03/day%3D12/vehicle_positions.parquet"
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected canonical path: got %q want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCanonicalPathPreservesExistingEscapes(t *testing.T) {
|
||||||
|
u, err := url.Parse("http://localhost:2600/test-bucket/jsp-data-raw/year%3d2026/file%2Eparquet")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("url.Parse failed: %v", err)
|
||||||
|
}
|
||||||
|
got := canonicalPath(u)
|
||||||
|
want := "/test-bucket/jsp-data-raw/year%3D2026/file%2Eparquet"
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected canonical path: got %q want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildCanonicalRequestUsesAwsEncodedPath(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://localhost:2600/test-bucket/jsp-data-raw/year=2026/month=03/day=12/vehicle_positions.parquet", nil)
|
||||||
|
req.Header.Set("x-amz-date", "20260313T120000Z")
|
||||||
|
req.Header.Set("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
|
||||||
|
|
||||||
|
canonical, err := buildCanonicalRequest(req, []string{"host", "x-amz-content-sha256", "x-amz-date"}, "UNSIGNED-PAYLOAD", false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildCanonicalRequest failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := strings.Split(canonical, "\n")
|
||||||
|
if len(lines) < 2 {
|
||||||
|
t.Fatalf("canonical request has unexpected format: %q", canonical)
|
||||||
|
}
|
||||||
|
wantPath := "/test-bucket/jsp-data-raw/year%3D2026/month%3D03/day%3D12/vehicle_positions.parquet"
|
||||||
|
if lines[1] != wantPath {
|
||||||
|
t.Fatalf("unexpected canonical path line: got %q want %q", lines[1], wantPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,12 +32,15 @@ This project is S3-compatible for a focused subset of operations.
|
|||||||
### Authentication
|
### Authentication
|
||||||
- AWS SigV4 header auth
|
- AWS SigV4 header auth
|
||||||
- AWS SigV4 presigned query auth
|
- AWS SigV4 presigned query auth
|
||||||
- `aws-chunked` payload decode for streaming uploads
|
- `aws-chunked` payload decode for unsigned streaming upload modes
|
||||||
|
- SigV4 payload hash verification for fixed-size signed payloads
|
||||||
|
|
||||||
## Partially Implemented / Differences
|
## Partially Implemented / Differences
|
||||||
- Exact parity with AWS S3 error codes/headers is still evolving.
|
- Exact parity with AWS S3 error codes/headers is still evolving.
|
||||||
- Some S3 edge-case behaviors may differ (especially uncommon query/header combinations).
|
- Some S3 edge-case behaviors may differ (especially uncommon query/header combinations).
|
||||||
- Admin API is custom JSON (`/_admin/v1/*`).
|
- Admin API is custom JSON (`/_admin/v1/*`).
|
||||||
|
- Object and upload-part payloads are limited by `FS_MAX_OBJECT_UPLOAD_BYTES` (default 5 GiB).
|
||||||
|
- Signed `aws-chunked` payload modes that require per-chunk signature verification are rejected until chunk-signature validation is implemented.
|
||||||
|
|
||||||
## Not Implemented (Current)
|
## Not Implemented (Current)
|
||||||
- Bucket versioning
|
- Bucket versioning
|
||||||
|
|||||||
@@ -902,9 +902,6 @@ func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int,
|
|||||||
if err := json.Unmarshal(v, &upload); err != nil {
|
if err := json.Unmarshal(v, &upload); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if upload.State == "pending" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
createdAt, err := time.Parse(time.RFC3339, upload.CreatedAt)
|
createdAt, err := time.Parse(time.RFC3339, upload.CreatedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
99
metadata/metadata_test.go
Normal file
99
metadata/metadata_test.go
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fs/models"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCleanupMultipartUploadsDeletesExpiredPendingUpload(t *testing.T) {
|
||||||
|
h := newTestMetadataHandler(t)
|
||||||
|
if err := h.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
upload, err := h.CreateMultipartUpload("test-bucket", "object.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||||
|
}
|
||||||
|
if err := h.PutMultipartPart(upload.UploadID, models.UploadedPart{PartNumber: 1, ETag: "etag", Size: 4, Chunks: []string{"chunk-id"}}); err != nil {
|
||||||
|
t.Fatalf("PutMultipartPart: %v", err)
|
||||||
|
}
|
||||||
|
setMultipartUploadCreatedAt(t, h, upload.UploadID, time.Now().Add(-2*time.Hour))
|
||||||
|
|
||||||
|
cleaned, err := h.CleanupMultipartUploads(time.Hour)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CleanupMultipartUploads: %v", err)
|
||||||
|
}
|
||||||
|
if cleaned != 1 {
|
||||||
|
t.Fatalf("cleaned = %d, want 1", cleaned)
|
||||||
|
}
|
||||||
|
if _, err := h.GetMultipartUpload(upload.UploadID); !errors.Is(err, ErrMultipartNotFound) {
|
||||||
|
t.Fatalf("GetMultipartUpload error = %v, want ErrMultipartNotFound", err)
|
||||||
|
}
|
||||||
|
if _, err := h.ListMultipartParts(upload.UploadID); !errors.Is(err, ErrMultipartNotFound) {
|
||||||
|
t.Fatalf("ListMultipartParts error = %v, want ErrMultipartNotFound", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCleanupMultipartUploadsKeepsRecentPendingUpload(t *testing.T) {
|
||||||
|
h := newTestMetadataHandler(t)
|
||||||
|
if err := h.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
upload, err := h.CreateMultipartUpload("test-bucket", "object.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cleaned, err := h.CleanupMultipartUploads(time.Hour)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CleanupMultipartUploads: %v", err)
|
||||||
|
}
|
||||||
|
if cleaned != 0 {
|
||||||
|
t.Fatalf("cleaned = %d, want 0", cleaned)
|
||||||
|
}
|
||||||
|
if _, err := h.GetMultipartUpload(upload.UploadID); err != nil {
|
||||||
|
t.Fatalf("recent upload should remain: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCleanupMultipartUploadsDisabledForNonPositiveRetention(t *testing.T) {
|
||||||
|
h := newTestMetadataHandler(t)
|
||||||
|
cleaned, err := h.CleanupMultipartUploads(0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CleanupMultipartUploads: %v", err)
|
||||||
|
}
|
||||||
|
if cleaned != 0 {
|
||||||
|
t.Fatalf("cleaned = %d, want 0", cleaned)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestMetadataHandler(t *testing.T) *MetadataHandler {
|
||||||
|
t.Helper()
|
||||||
|
h, err := NewMetadataHandler(filepath.Join(t.TempDir(), "metadata.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewMetadataHandler: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = h.Close()
|
||||||
|
})
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func setMultipartUploadCreatedAt(t *testing.T, h *MetadataHandler, uploadID string, createdAt time.Time) {
|
||||||
|
t.Helper()
|
||||||
|
if err := h.update(func(tx *bbolt.Tx) error {
|
||||||
|
upload, uploadsBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
upload.CreatedAt = createdAt.UTC().Format(time.RFC3339)
|
||||||
|
return putMultipartUpload(uploadsBucket, uploadID, upload)
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("set multipart created_at: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -21,6 +21,7 @@ type ObjectService struct {
|
|||||||
metadata *metadata.MetadataHandler
|
metadata *metadata.MetadataHandler
|
||||||
blob *storage.BlobStore
|
blob *storage.BlobStore
|
||||||
multipartRetention time.Duration
|
multipartRetention time.Duration
|
||||||
|
maxUploadSize int64
|
||||||
gcMu sync.RWMutex
|
gcMu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,16 +30,24 @@ var (
|
|||||||
ErrInvalidPartOrder = errors.New("invalid multipart part order")
|
ErrInvalidPartOrder = errors.New("invalid multipart part order")
|
||||||
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
|
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
|
||||||
ErrEntityTooSmall = errors.New("multipart entity too small")
|
ErrEntityTooSmall = errors.New("multipart entity too small")
|
||||||
|
ErrEntityTooLarge = errors.New("entity too large")
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration) *ObjectService {
|
const DefaultMaxUploadSize int64 = 5 * 1024 * 1024 * 1024
|
||||||
|
|
||||||
|
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration, maxUploadSize ...int64) *ObjectService {
|
||||||
if multipartRetention <= 0 {
|
if multipartRetention <= 0 {
|
||||||
multipartRetention = 24 * time.Hour
|
multipartRetention = 24 * time.Hour
|
||||||
}
|
}
|
||||||
|
limit := DefaultMaxUploadSize
|
||||||
|
if len(maxUploadSize) > 0 {
|
||||||
|
limit = maxUploadSize[0]
|
||||||
|
}
|
||||||
return &ObjectService{
|
return &ObjectService{
|
||||||
metadata: metadataHandler,
|
metadata: metadataHandler,
|
||||||
blob: blobHandler,
|
blob: blobHandler,
|
||||||
multipartRetention: multipartRetention,
|
multipartRetention: multipartRetention,
|
||||||
|
maxUploadSize: limit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,7 +83,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
|
|||||||
unlock := s.acquireGCRLock()
|
unlock := s.acquireGCRLock()
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
chunks, size, etag, err := s.blob.IngestStream(input)
|
chunks, size, etag, err := s.blob.IngestStream(s.limitUpload(input))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -158,7 +167,9 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
|||||||
defer func() {
|
defer func() {
|
||||||
metrics.Default.ObserveService("get_object", time.Since(start), streamOK)
|
metrics.Default.ObserveService("get_object", time.Since(start), streamOK)
|
||||||
}()
|
}()
|
||||||
defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
defer func() {
|
||||||
|
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
||||||
|
}()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
||||||
_ = pw.CloseWithError(err)
|
_ = pw.CloseWithError(err)
|
||||||
@@ -311,7 +322,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
|||||||
}
|
}
|
||||||
|
|
||||||
var uploadedPart models.UploadedPart
|
var uploadedPart models.UploadedPart
|
||||||
chunkIds, totalSize, etag, err := s.blob.IngestStream(input)
|
chunkIds, totalSize, etag, err := s.blob.IngestStream(s.limitUpload(input))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -400,6 +411,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
|
|||||||
orderedParts = append(orderedParts, storedPart)
|
orderedParts = append(orderedParts, storedPart)
|
||||||
chunks = append(chunks, storedPart.Chunks...)
|
chunks = append(chunks, storedPart.Chunks...)
|
||||||
totalSize += storedPart.Size
|
totalSize += storedPart.Size
|
||||||
|
if s.maxUploadSize > 0 && totalSize > s.maxUploadSize {
|
||||||
|
return nil, ErrEntityTooLarge
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
finalETag := buildMultipartETag(orderedParts)
|
finalETag := buildMultipartETag(orderedParts)
|
||||||
@@ -435,6 +449,40 @@ func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error
|
|||||||
return s.metadata.AbortMultipartUpload(uploadID)
|
return s.metadata.AbortMultipartUpload(uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ObjectService) limitUpload(input io.Reader) io.Reader {
|
||||||
|
if s.maxUploadSize <= 0 || input == nil {
|
||||||
|
return input
|
||||||
|
}
|
||||||
|
return &maxBytesReader{inner: input, remaining: s.maxUploadSize}
|
||||||
|
}
|
||||||
|
|
||||||
|
type maxBytesReader struct {
|
||||||
|
inner io.Reader
|
||||||
|
remaining int64
|
||||||
|
tooLarge bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *maxBytesReader) Read(p []byte) (int, error) {
|
||||||
|
if r.tooLarge {
|
||||||
|
return 0, ErrEntityTooLarge
|
||||||
|
}
|
||||||
|
if r.remaining <= 0 {
|
||||||
|
var probe [1]byte
|
||||||
|
n, err := r.inner.Read(probe[:])
|
||||||
|
if n > 0 {
|
||||||
|
r.tooLarge = true
|
||||||
|
return 0, ErrEntityTooLarge
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if int64(len(p)) > r.remaining {
|
||||||
|
p = p[:r.remaining]
|
||||||
|
}
|
||||||
|
n, err := r.inner.Read(p)
|
||||||
|
r.remaining -= int64(n)
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeETag(etag string) string {
|
func normalizeETag(etag string) string {
|
||||||
return strings.Trim(etag, "\"")
|
return strings.Trim(etag, "\"")
|
||||||
}
|
}
|
||||||
@@ -469,6 +517,12 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
unlock := s.acquireGCLock()
|
unlock := s.acquireGCLock()
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
|
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -492,11 +546,6 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Info("garbage_collect_completed",
|
slog.Info("garbage_collect_completed",
|
||||||
"referenced_chunks", len(referencedChunkSet),
|
"referenced_chunks", len(referencedChunkSet),
|
||||||
"total_chunks", totalChunks,
|
"total_chunks", totalChunks,
|
||||||
|
|||||||
119
service/upload_limit_test.go
Normal file
119
service/upload_limit_test.go
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fs/metadata"
|
||||||
|
"fs/storage"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPutObjectRejectsOversizedUpload(t *testing.T) {
|
||||||
|
svc := newTestObjectService(t, 4)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := svc.PutObject("test-bucket", "too-large.txt", "text/plain", strings.NewReader("12345"))
|
||||||
|
if !errors.Is(err, ErrEntityTooLarge) {
|
||||||
|
t.Fatalf("PutObject error = %v, want ErrEntityTooLarge", err)
|
||||||
|
}
|
||||||
|
if _, err := svc.HeadObject("test-bucket", "too-large.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
|
||||||
|
t.Fatalf("HeadObject error = %v, want ErrObjectNotFound", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutObjectAllowsExactUploadLimit(t *testing.T) {
|
||||||
|
svc := newTestObjectService(t, 4)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest, err := svc.PutObject("test-bucket", "exact.txt", "text/plain", strings.NewReader("1234"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("PutObject: %v", err)
|
||||||
|
}
|
||||||
|
if manifest.Size != 4 {
|
||||||
|
t.Fatalf("manifest size = %d, want 4", manifest.Size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUploadPartRejectsOversizedUpload(t *testing.T) {
|
||||||
|
svc := newTestObjectService(t, 4)
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = svc.UploadPart("test-bucket", "object.txt", upload.UploadID, 1, strings.NewReader("12345"))
|
||||||
|
if !errors.Is(err, ErrEntityTooLarge) {
|
||||||
|
t.Fatalf("UploadPart error = %v, want ErrEntityTooLarge", err)
|
||||||
|
}
|
||||||
|
parts, err := svc.ListMultipartParts("test-bucket", "object.txt", upload.UploadID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ListMultipartParts: %v", err)
|
||||||
|
}
|
||||||
|
if len(parts) != 0 {
|
||||||
|
t.Fatalf("stored parts = %d, want 0", len(parts))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGarbageCollectRemovesExpiredPendingMultipartChunks(t *testing.T) {
|
||||||
|
svc := newTestObjectService(t, 1024)
|
||||||
|
svc.multipartRetention = time.Nanosecond
|
||||||
|
if err := svc.CreateBucket("test-bucket"); err != nil {
|
||||||
|
t.Fatalf("CreateBucket: %v", err)
|
||||||
|
}
|
||||||
|
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateMultipartUpload: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := svc.UploadPart("test-bucket", "object.txt", upload.UploadID, 1, strings.NewReader("part-data")); err != nil {
|
||||||
|
t.Fatalf("UploadPart: %v", err)
|
||||||
|
}
|
||||||
|
chunks, err := svc.blob.ListChunks()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ListChunks before GC: %v", err)
|
||||||
|
}
|
||||||
|
if len(chunks) == 0 {
|
||||||
|
t.Fatalf("expected uploaded part chunks")
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
|
||||||
|
if err := svc.GarbageCollect(); err != nil {
|
||||||
|
t.Fatalf("GarbageCollect: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := svc.metadata.GetMultipartUpload(upload.UploadID); !errors.Is(err, metadata.ErrMultipartNotFound) {
|
||||||
|
t.Fatalf("GetMultipartUpload error = %v, want ErrMultipartNotFound", err)
|
||||||
|
}
|
||||||
|
chunks, err = svc.blob.ListChunks()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ListChunks after GC: %v", err)
|
||||||
|
}
|
||||||
|
if len(chunks) != 0 {
|
||||||
|
t.Fatalf("chunks after GC = %d, want 0", len(chunks))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestObjectService(t *testing.T, maxUploadSize int64) *ObjectService {
|
||||||
|
t.Helper()
|
||||||
|
root := t.TempDir()
|
||||||
|
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewMetadataHandler: %v", err)
|
||||||
|
}
|
||||||
|
blob, err := storage.NewBlobStore(root, 4)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewBlobStore: %v", err)
|
||||||
|
}
|
||||||
|
svc := NewObjectService(md, blob, time.Hour, maxUploadSize)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = svc.Close()
|
||||||
|
})
|
||||||
|
return svc
|
||||||
|
}
|
||||||
@@ -17,6 +17,8 @@ import (
|
|||||||
const blobRoot = "blobs"
|
const blobRoot = "blobs"
|
||||||
const maxChunkSize = 64 * 1024 * 1024
|
const maxChunkSize = 64 * 1024 * 1024
|
||||||
|
|
||||||
|
var ErrChunkIntegrity = errors.New("chunk integrity check failed")
|
||||||
|
|
||||||
type BlobStore struct {
|
type BlobStore struct {
|
||||||
dataRoot string
|
dataRoot string
|
||||||
chunkSize int
|
chunkSize int
|
||||||
@@ -185,6 +187,11 @@ func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
chunkHash := sha256.Sum256(data)
|
||||||
|
actualChunkID := hex.EncodeToString(chunkHash[:])
|
||||||
|
if actualChunkID != chunkID {
|
||||||
|
return nil, fmt.Errorf("%w: expected %s, got %s", ErrChunkIntegrity, chunkID, actualChunkID)
|
||||||
|
}
|
||||||
size = int64(len(data))
|
size = int64(len(data))
|
||||||
success = true
|
success = true
|
||||||
return data, nil
|
return data, nil
|
||||||
|
|||||||
79
storage/blob_test.go
Normal file
79
storage/blob_test.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetBlobDetectsCorruptedChunk(t *testing.T) {
|
||||||
|
root := t.TempDir()
|
||||||
|
bs, err := NewBlobStore(root, 4)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new blob store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks, _, _, err := bs.IngestStream(strings.NewReader("good"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ingest: %v", err)
|
||||||
|
}
|
||||||
|
chunkID := chunks[0]
|
||||||
|
corruptChunk(t, root, chunkID, []byte("bad"))
|
||||||
|
|
||||||
|
got, err := bs.GetBlob(chunkID)
|
||||||
|
if !errors.Is(err, ErrChunkIntegrity) {
|
||||||
|
t.Fatalf("GetBlob error = %v, want ErrChunkIntegrity", err)
|
||||||
|
}
|
||||||
|
if got != nil {
|
||||||
|
t.Fatalf("GetBlob returned data for corrupted chunk: %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAssembleStreamDetectsCorruptedChunk(t *testing.T) {
|
||||||
|
root := t.TempDir()
|
||||||
|
bs, err := NewBlobStore(root, 4)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new blob store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks, _, _, err := bs.IngestStream(strings.NewReader("abcdefgh"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ingest: %v", err)
|
||||||
|
}
|
||||||
|
if len(chunks) != 2 {
|
||||||
|
t.Fatalf("chunk count = %d, want 2", len(chunks))
|
||||||
|
}
|
||||||
|
corruptChunk(t, root, chunks[1], []byte("corrupt"))
|
||||||
|
|
||||||
|
pr, pw := io.Pipe()
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
err := bs.AssembleStream(chunks, pw)
|
||||||
|
if err != nil {
|
||||||
|
_ = pw.CloseWithError(err)
|
||||||
|
} else {
|
||||||
|
_ = pw.Close()
|
||||||
|
}
|
||||||
|
errCh <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, readErr := io.ReadAll(pr)
|
||||||
|
assembleErr := <-errCh
|
||||||
|
if !errors.Is(assembleErr, ErrChunkIntegrity) {
|
||||||
|
t.Fatalf("AssembleStream error = %v, want ErrChunkIntegrity", assembleErr)
|
||||||
|
}
|
||||||
|
if !errors.Is(readErr, ErrChunkIntegrity) {
|
||||||
|
t.Fatalf("pipe read error = %v, want ErrChunkIntegrity", readErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func corruptChunk(t *testing.T, root, chunkID string, data []byte) {
|
||||||
|
t.Helper()
|
||||||
|
path := filepath.Join(root, blobRoot, chunkID[:2], chunkID[2:4], chunkID)
|
||||||
|
if err := os.WriteFile(path, data, 0o600); err != nil {
|
||||||
|
t.Fatalf("corrupt chunk: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -15,6 +15,7 @@ type Config struct {
|
|||||||
Address string
|
Address string
|
||||||
Port int
|
Port int
|
||||||
ChunkSize int
|
ChunkSize int
|
||||||
|
MaxObjectUploadBytes int64
|
||||||
LogLevel string
|
LogLevel string
|
||||||
LogFormat string
|
LogFormat string
|
||||||
AuditLog bool
|
AuditLog bool
|
||||||
@@ -40,6 +41,7 @@ func NewConfig() *Config {
|
|||||||
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
|
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
|
||||||
Port: envIntRange("PORT", 2600, 1, 65535),
|
Port: envIntRange("PORT", 2600, 1, 65535),
|
||||||
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
|
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
|
||||||
|
MaxObjectUploadBytes: envInt64Range("FS_MAX_OBJECT_UPLOAD_BYTES", 5*1024*1024*1024, 1, 5*1024*1024*1024),
|
||||||
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
|
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
|
||||||
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
|
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
|
||||||
AuditLog: envBool("AUDIT_LOG", true),
|
AuditLog: envBool("AUDIT_LOG", true),
|
||||||
@@ -82,6 +84,21 @@ func envIntRange(key string, defaultValue, minValue, maxValue int) int {
|
|||||||
return value
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func envInt64Range(key string, defaultValue, minValue, maxValue int64) int64 {
|
||||||
|
raw := strings.TrimSpace(os.Getenv(key))
|
||||||
|
if raw == "" {
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
value, err := strconv.ParseInt(raw, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
if value < minValue || value > maxValue {
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
func envBool(key string, defaultValue bool) bool {
|
func envBool(key string, defaultValue bool) bool {
|
||||||
raw := strings.TrimSpace(os.Getenv(key))
|
raw := strings.TrimSpace(os.Getenv(key))
|
||||||
if raw == "" {
|
if raw == "" {
|
||||||
|
|||||||
21
utils/config_test.go
Normal file
21
utils/config_test.go
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestEnvInt64Range(t *testing.T) {
|
||||||
|
t.Setenv("TEST_INT64_RANGE", "42")
|
||||||
|
if got := envInt64Range("TEST_INT64_RANGE", 10, 1, 100); got != 42 {
|
||||||
|
t.Fatalf("envInt64Range valid = %d, want 42", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEnvInt64RangeFallsBackForInvalidValues(t *testing.T) {
|
||||||
|
t.Setenv("TEST_INT64_RANGE", "invalid")
|
||||||
|
if got := envInt64Range("TEST_INT64_RANGE", 10, 1, 100); got != 10 {
|
||||||
|
t.Fatalf("envInt64Range invalid = %d, want 10", got)
|
||||||
|
}
|
||||||
|
t.Setenv("TEST_INT64_RANGE", "101")
|
||||||
|
if got := envInt64Range("TEST_INT64_RANGE", 10, 1, 100); got != 10 {
|
||||||
|
t.Fatalf("envInt64Range too large = %d, want 10", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user