6 Commits

Author SHA1 Message Date
f61cc3168b Reject unsupported aws-chunked uploads
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-16 10:24:32 +02:00
e928ebca15 Defer multi-delete authorization to handler
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-16 10:24:32 +02:00
654a505c0d Document S3 auth hardening
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-16 10:15:26 +02:00
0f9b461e8e Verify chunk integrity on read
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-16 10:11:25 +02:00
c3c9e3262f Add upload limits and multipart cleanup
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-16 10:11:15 +02:00
2425cd524e Harden S3 auth boundaries
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-16 10:11:04 +02:00
28 changed files with 1439 additions and 54 deletions

View File

@@ -1,6 +1,7 @@
LOG_LEVEL=debug LOG_LEVEL=debug
LOG_FORMAT=text LOG_FORMAT=text
DATA_PATH=data/ DATA_PATH=data/
FS_MAX_OBJECT_UPLOAD_BYTES=5368709120
PORT=2600 PORT=2600
AUDIT_LOG=true AUDIT_LOG=true
ADDRESS=0.0.0.0 ADDRESS=0.0.0.0

View File

@@ -127,6 +127,9 @@ Required when `FS_AUTH_ENABLED=true`:
- `FS_ROOT_USER` and `FS_ROOT_PASSWORD` define initial credentials - `FS_ROOT_USER` and `FS_ROOT_PASSWORD` define initial credentials
- `ADMIN_API_ENABLED=true` enables `/_admin/v1/*` routes (bootstrap key only) - `ADMIN_API_ENABLED=true` enables `/_admin/v1/*` routes (bootstrap key only)
Upload limits:
- `FS_MAX_OBJECT_UPLOAD_BYTES` limits object PUT payloads, multipart upload parts, and completed multipart object size (default 5 GiB).
Reference: `auth/README.md` Reference: `auth/README.md`
Additional docs: Additional docs:

View File

@@ -41,6 +41,7 @@ const (
maxXMLBodyBytes int64 = 1 << 20 maxXMLBodyBytes int64 = 1 << 20
maxDeleteObjects = 1000 maxDeleteObjects = 1000
maxObjectKeyBytes = 1024 maxObjectKeyBytes = 1024
maxAWSChunkedLineBytes = 8 << 10
serverReadHeaderTimeout = 5 * time.Second serverReadHeaderTimeout = 5 * time.Second
serverReadTimeout = 60 * time.Second serverReadTimeout = 60 * time.Second
serverWriteTimeout = 120 * time.Second serverWriteTimeout = 120 * time.Second
@@ -196,6 +197,10 @@ func parseCopySource(raw string) (string, string, error) {
} }
func (h *Handler) authorizeCopySource(r *http.Request, bucket, key string) error { func (h *Handler) authorizeCopySource(r *http.Request, bucket, key string) error {
return h.authorizeObjectAction(r, auth.ActionGetObject, bucket, key)
}
func (h *Handler) authorizeObjectAction(r *http.Request, action auth.Action, bucket, key string) error {
if h.authSvc == nil || !h.authSvc.Config().Enabled { if h.authSvc == nil || !h.authSvc.Config().Enabled {
return nil return nil
} }
@@ -206,7 +211,7 @@ func (h *Handler) authorizeCopySource(r *http.Request, bucket, key string) error
} }
return h.authSvc.Authorize(authCtx.AccessKeyID, auth.RequestTarget{ return h.authSvc.Authorize(authCtx.AccessKeyID, auth.RequestTarget{
Action: auth.ActionGetObject, Action: action,
Bucket: bucket, Bucket: bucket,
Key: key, Key: key,
}) })
@@ -307,6 +312,10 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes) r.Body = http.MaxBytesReader(w, r.Body, maxXMLBodyBytes)
var req models.CompleteMultipartUploadRequest var req models.CompleteMultipartUploadRequest
if err := xml.NewDecoder(r.Body).Decode(&req); err != nil { if err := xml.NewDecoder(r.Body).Decode(&req); err != nil {
if errors.Is(err, auth.ErrSignatureDoesNotMatch) {
writeMappedS3Error(w, r, err)
return
}
var maxErr *http.MaxBytesError var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) { if errors.As(err, &maxErr) {
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path) writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
@@ -379,6 +388,10 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
bodyReader := io.Reader(r.Body) bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser var decodeStream io.ReadCloser
if hasUnsupportedAWSChunkedPayload(r) {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if shouldDecodeAWSChunkedPayload(r) { if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body) decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close() defer decodeStream.Close()
@@ -453,6 +466,10 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
bodyReader := io.Reader(r.Body) bodyReader := io.Reader(r.Body)
var decodeStream io.ReadCloser var decodeStream io.ReadCloser
if hasUnsupportedAWSChunkedPayload(r) {
writeS3Error(w, r, s3ErrInvalidArgument, r.URL.Path)
return
}
if shouldDecodeAWSChunkedPayload(r) { if shouldDecodeAWSChunkedPayload(r) {
decodeStream = newAWSChunkedDecodingReader(r.Body) decodeStream = newAWSChunkedDecodingReader(r.Body)
defer decodeStream.Close() defer decodeStream.Close()
@@ -508,17 +525,18 @@ func (h *Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Reques
} }
func shouldDecodeAWSChunkedPayload(r *http.Request) bool { func shouldDecodeAWSChunkedPayload(r *http.Request) bool {
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
if strings.Contains(contentEncoding, "aws-chunked") {
return true
}
signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256")) signingMode := strings.ToLower(r.Header.Get("x-amz-content-sha256"))
if strings.HasPrefix(signingMode, "streaming-aws4-hmac-sha256-payload") {
return true
}
return strings.HasPrefix(signingMode, "streaming-unsigned-payload") return strings.HasPrefix(signingMode, "streaming-unsigned-payload")
} }
func hasUnsupportedAWSChunkedPayload(r *http.Request) bool {
contentEncoding := strings.ToLower(r.Header.Get("Content-Encoding"))
if !strings.Contains(contentEncoding, "aws-chunked") {
return false
}
return !shouldDecodeAWSChunkedPayload(r)
}
func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser { func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
probedReader, isAWSChunked := probeAWSChunkedPayload(src) probedReader, isAWSChunked := probeAWSChunkedPayload(src)
if !isAWSChunked { if !isAWSChunked {
@@ -537,9 +555,12 @@ func newAWSChunkedDecodingReader(src io.Reader) io.ReadCloser {
} }
func probeAWSChunkedPayload(src io.Reader) (io.Reader, bool) { func probeAWSChunkedPayload(src io.Reader) (io.Reader, bool) {
reader := bufio.NewReaderSize(src, 512) reader := bufio.NewReaderSize(src, maxAWSChunkedLineBytes)
headerLine, err := reader.ReadSlice('\n') headerLine, err := reader.ReadSlice('\n')
replay := io.MultiReader(bytes.NewReader(headerLine), reader) replay := io.MultiReader(bytes.NewReader(headerLine), reader)
if errors.Is(err, bufio.ErrBufferFull) {
return replay, true
}
if err != nil { if err != nil {
return replay, false return replay, false
} }
@@ -561,9 +582,9 @@ func probeAWSChunkedPayload(src io.Reader) (io.Reader, bool) {
} }
func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error { func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
reader := bufio.NewReader(src) reader := bufio.NewReaderSize(src, maxAWSChunkedLineBytes)
for { for {
headerLine, err := reader.ReadString('\n') headerLine, err := readAWSChunkedLine(reader)
if err != nil { if err != nil {
return err return err
} }
@@ -580,6 +601,17 @@ func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
if chunkSize < 0 { if chunkSize < 0 {
return fmt.Errorf("invalid aws-chunked size: %d", chunkSize) return fmt.Errorf("invalid aws-chunked size: %d", chunkSize)
} }
if chunkSize == 0 {
for {
line, err := readAWSChunkedLine(reader)
if err != nil {
return err
}
if line == "\r\n" || line == "\n" {
return nil
}
}
}
if chunkSize > 0 { if chunkSize > 0 {
if _, err := io.CopyN(dst, reader, chunkSize); err != nil { if _, err := io.CopyN(dst, reader, chunkSize); err != nil {
return err return err
@@ -593,19 +625,18 @@ func decodeAWSChunkedPayload(src io.Reader, dst io.Writer) error {
if crlf[0] != '\r' || crlf[1] != '\n' { if crlf[0] != '\r' || crlf[1] != '\n' {
return errors.New("invalid aws-chunked payload terminator") return errors.New("invalid aws-chunked payload terminator")
} }
}
}
if chunkSize == 0 { func readAWSChunkedLine(reader *bufio.Reader) (string, error) {
for { line, err := reader.ReadSlice('\n')
line, err := reader.ReadString('\n') if errors.Is(err, bufio.ErrBufferFull) {
if err != nil { return "", service.ErrEntityTooLarge
return err
}
if line == "\r\n" || line == "\n" {
return nil
}
}
} }
if len(line) > maxAWSChunkedLineBytes {
return "", service.ErrEntityTooLarge
} }
return string(line), err
} }
func ifNoneMatchPreconditionFailed(headerValue, etag string) bool { func ifNoneMatchPreconditionFailed(headerValue, etag string) bool {
@@ -664,6 +695,10 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
var req models.DeleteObjectsRequest var req models.DeleteObjectsRequest
if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil { if err := xml.NewDecoder(bodyReader).Decode(&req); err != nil {
if errors.Is(err, auth.ErrSignatureDoesNotMatch) {
writeMappedS3Error(w, r, err)
return
}
var maxErr *http.MaxBytesError var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) { if errors.As(err, &maxErr) {
writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path) writeS3Error(w, r, s3ErrEntityTooLarge, r.URL.Path)
@@ -699,6 +734,15 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
}) })
continue continue
} }
if err := h.authorizeObjectAction(r, auth.ActionDeleteObject, bucket, obj.Key); err != nil {
apiErr := mapToS3Error(err)
response.Errors = append(response.Errors, models.DeleteError{
Key: obj.Key,
Code: apiErr.Code,
Message: apiErr.Message,
})
continue
}
keys = append(keys, obj.Key) keys = append(keys, obj.Key)
} }

115
api/aws_chunked_test.go Normal file
View File

@@ -0,0 +1,115 @@
package api
import (
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"fs/service"
)
func TestShouldDecodeAWSChunkedPayloadUnsignedTrailerMode(t *testing.T) {
t.Parallel()
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("x-amz-content-sha256", "STREAMING-UNSIGNED-PAYLOAD-TRAILER")
if !shouldDecodeAWSChunkedPayload(req) {
t.Fatalf("expected shouldDecodeAWSChunkedPayload to return true for STREAMING-UNSIGNED-PAYLOAD-TRAILER")
}
}
func TestUnsupportedAWSChunkedContentEncodingWithoutStreamingMode(t *testing.T) {
t.Parallel()
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Encoding", "aws-chunked")
req.Header.Set("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
if !hasUnsupportedAWSChunkedPayload(req) {
t.Fatalf("expected aws-chunked content encoding without streaming mode to be unsupported")
}
if shouldDecodeAWSChunkedPayload(req) {
t.Fatalf("non-streaming aws-chunked content encoding must not trigger decoding")
}
}
func TestPutObjectRejectsUnsignedAWSChunkedContentEncoding(t *testing.T) {
handler, svc := newUploadLimitHandler(t, 1024)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
req := httptest.NewRequest(http.MethodPut, "/test-bucket/object.txt", strings.NewReader("4\r\nWiki\r\n0\r\n\r\n"))
req.Header.Set("Content-Encoding", "aws-chunked")
req.Header.Set("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
rec := httptest.NewRecorder()
handler.router.ServeHTTP(rec, req)
if rec.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want %d body=%s", rec.Code, http.StatusBadRequest, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "InvalidArgument") {
t.Fatalf("expected InvalidArgument response, body=%s", rec.Body.String())
}
}
func TestAWSChunkedReaderPassThroughForPlainPayload(t *testing.T) {
t.Parallel()
plain := "PAR1\x00\x01\x02\x03binary-without-aws-chunked-header"
reader := newAWSChunkedDecodingReader(strings.NewReader(plain))
defer reader.Close()
out, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("read failed: %v", err)
}
if string(out) != plain {
t.Fatalf("unexpected passthrough result: got %q want %q", string(out), plain)
}
}
func TestAWSChunkedReaderDecodesChunkedPayload(t *testing.T) {
t.Parallel()
encoded := "" +
"4\r\nWiki\r\n" +
"5\r\npedia\r\n" +
"0\r\n" +
"x-amz-checksum-crc32:xxxx\r\n" +
"\r\n"
reader := newAWSChunkedDecodingReader(strings.NewReader(encoded))
defer reader.Close()
out, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("read failed: %v", err)
}
if string(out) != "Wikipedia" {
t.Fatalf("decoded payload mismatch: got %q want %q", string(out), "Wikipedia")
}
}
func TestAWSChunkedReaderRejectsOversizedChunkHeader(t *testing.T) {
t.Parallel()
encoded := strings.Repeat("f", maxAWSChunkedLineBytes+1) + "\n"
reader := newAWSChunkedDecodingReader(strings.NewReader(encoded))
defer reader.Close()
_, err := io.ReadAll(reader)
if !errors.Is(err, service.ErrEntityTooLarge) {
t.Fatalf("read error = %v, want ErrEntityTooLarge", err)
}
}

View File

@@ -0,0 +1,283 @@
package api
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
"path/filepath"
"sort"
"strings"
"testing"
"time"
"fs/auth"
"fs/logging"
"fs/metadata"
"fs/models"
"fs/service"
"fs/storage"
"github.com/go-chi/chi/v5"
)
func newAuthorizedDeleteHandler(t *testing.T) (*Handler, *service.ObjectService, *auth.Service) {
t.Helper()
root := t.TempDir()
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
if err != nil {
t.Fatalf("new metadata handler: %v", err)
}
blob, err := storage.NewBlobStore(root, 1024)
if err != nil {
t.Fatalf("new blob store: %v", err)
}
svc := service.NewObjectService(md, blob, time.Hour)
t.Cleanup(func() {
_ = svc.Close()
})
masterKey := base64.StdEncoding.EncodeToString(make([]byte, 32))
authSvc, err := auth.NewService(auth.ConfigFromValues(
true,
"us-east-1",
0,
0,
masterKey,
"",
"",
"",
), md)
if err != nil {
t.Fatalf("new auth service: %v", err)
}
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
handler := NewHandler(svc, logger, logging.Config{}, authSvc, false)
return handler, svc, authSvc
}
func newBucketPostRequest(bucket, body string) *http.Request {
req := httptest.NewRequest(http.MethodPost, "/"+bucket+"?delete", strings.NewReader(body))
rctx := chi.NewRouteContext()
rctx.URLParams.Add("bucket", bucket)
return req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
}
func withAuthContext(req *http.Request, accessKeyID string) *http.Request {
authCtx := auth.RequestContext{
Authenticated: true,
AccessKeyID: accessKeyID,
AuthType: "test",
}
return req.WithContext(auth.WithRequestContext(req.Context(), authCtx))
}
func createDeleteUser(t *testing.T, authSvc *auth.Service, prefix string) {
t.Helper()
createDeleteUserWithStatements(t, authSvc, []models.AuthPolicyStatement{
{
Effect: "allow",
Actions: []string{"s3:DeleteObject"},
Bucket: "test-bucket",
Prefix: prefix,
},
})
}
func createDeleteUserWithStatements(t *testing.T, authSvc *auth.Service, statements []models.AuthPolicyStatement) {
t.Helper()
_, err := authSvc.CreateUser(auth.CreateUserInput{
AccessKeyID: "delete-user",
SecretKey: "delete-secret-1",
Policy: models.AuthPolicy{
Statements: statements,
},
})
if err != nil {
t.Fatalf("create delete user: %v", err)
}
}
func putTestObject(t *testing.T, svc *service.ObjectService, key string) {
t.Helper()
_, err := svc.PutObject("test-bucket", key, "text/plain", bytes.NewReader([]byte("data")))
if err != nil {
t.Fatalf("put object %q: %v", key, err)
}
}
func TestMultiDeleteAuthorizesEveryKey(t *testing.T) {
handler, svc, authSvc := newAuthorizedDeleteHandler(t)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("create bucket: %v", err)
}
createDeleteUser(t, authSvc, "allowed/")
putTestObject(t, svc, "allowed/file.txt")
putTestObject(t, svc, "private/file.txt")
body := `<Delete><Object><Key>allowed/file.txt</Key></Object><Object><Key>private/file.txt</Key></Object></Delete>`
req := withAuthContext(newBucketPostRequest("test-bucket", body), "delete-user")
rec := httptest.NewRecorder()
handler.handlePostBucket(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
}
responseBody := rec.Body.String()
if !strings.Contains(responseBody, "<Deleted>") || !strings.Contains(responseBody, "allowed/file.txt") {
t.Fatalf("expected allowed key to be deleted, body=%s", responseBody)
}
if !strings.Contains(responseBody, "<Error>") || !strings.Contains(responseBody, "private/file.txt") || !strings.Contains(responseBody, "AccessDenied") {
t.Fatalf("expected denied key error, body=%s", responseBody)
}
if _, err := svc.HeadObject("test-bucket", "allowed/file.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
t.Fatalf("allowed object should be deleted, got err=%v", err)
}
if _, err := svc.HeadObject("test-bucket", "private/file.txt"); err != nil {
t.Fatalf("private object should remain: %v", err)
}
}
func TestMultiDeleteAllowsScopedKeys(t *testing.T) {
handler, svc, authSvc := newAuthorizedDeleteHandler(t)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("create bucket: %v", err)
}
createDeleteUser(t, authSvc, "allowed/")
putTestObject(t, svc, "allowed/file.txt")
body := `<Delete><Object><Key>allowed/file.txt</Key></Object></Delete>`
req := withAuthContext(newBucketPostRequest("test-bucket", body), "delete-user")
rec := httptest.NewRecorder()
handler.handlePostBucket(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
}
if strings.Contains(rec.Body.String(), "<Error>") {
t.Fatalf("unexpected delete error body=%s", rec.Body.String())
}
if _, err := svc.HeadObject("test-bucket", "allowed/file.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
t.Fatalf("allowed object should be deleted, got err=%v", err)
}
}
func TestMultiDeleteRouteAuthorizesKeysAfterMiddleware(t *testing.T) {
handler, svc, authSvc := newAuthorizedDeleteHandler(t)
handler.setupRoutes()
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("create bucket: %v", err)
}
createDeleteUserWithStatements(t, authSvc, []models.AuthPolicyStatement{
{Effect: "allow", Actions: []string{"s3:DeleteObject"}, Bucket: "test-bucket", Prefix: "allowed/"},
{Effect: "deny", Actions: []string{"s3:DeleteObject"}, Bucket: "test-bucket", Prefix: "private/"},
})
putTestObject(t, svc, "allowed/file.txt")
putTestObject(t, svc, "private/file.txt")
body := `<Delete><Object><Key>allowed/file.txt</Key></Object><Object><Key>private/file.txt</Key></Object></Delete>`
req := httptest.NewRequest(http.MethodPost, "/test-bucket?delete", strings.NewReader(body))
signTestSigV4Request(t, req, "delete-user", "delete-secret-1")
rec := httptest.NewRecorder()
handler.router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
}
responseBody := rec.Body.String()
if !strings.Contains(responseBody, "allowed/file.txt") || !strings.Contains(responseBody, "<Deleted>") {
t.Fatalf("expected allowed key deletion, body=%s", responseBody)
}
if !strings.Contains(responseBody, "private/file.txt") || !strings.Contains(responseBody, "AccessDenied") {
t.Fatalf("expected per-key AccessDenied, body=%s", responseBody)
}
if _, err := svc.HeadObject("test-bucket", "allowed/file.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
t.Fatalf("allowed object should be deleted, got err=%v", err)
}
if _, err := svc.HeadObject("test-bucket", "private/file.txt"); err != nil {
t.Fatalf("private object should remain: %v", err)
}
}
func signTestSigV4Request(t *testing.T, req *http.Request, accessKeyID, secretKey string) {
t.Helper()
amzDate := time.Now().UTC().Format("20060102T150405Z")
date := amzDate[:8]
region := "us-east-1"
serviceName := "s3"
scope := strings.Join([]string{date, region, serviceName, "aws4_request"}, "/")
signedHeaders := []string{"host", "x-amz-content-sha256", "x-amz-date"}
signedHeadersRaw := strings.Join(signedHeaders, ";")
payloadHash := "UNSIGNED-PAYLOAD"
req.Header.Set("x-amz-date", amzDate)
req.Header.Set("x-amz-content-sha256", payloadHash)
canonicalRequest := strings.Join([]string{
req.Method,
req.URL.EscapedPath(),
canonicalTestQuery(req.URL.RawQuery),
"host:" + strings.TrimSpace(req.Host) + "\n" +
"x-amz-content-sha256:" + payloadHash + "\n" +
"x-amz-date:" + amzDate + "\n",
signedHeadersRaw,
payloadHash,
}, "\n")
canonicalHash := sha256.Sum256([]byte(canonicalRequest))
stringToSign := strings.Join([]string{
"AWS4-HMAC-SHA256",
amzDate,
scope,
hex.EncodeToString(canonicalHash[:]),
}, "\n")
signingKey := testHMAC(testHMAC(testHMAC(testHMAC([]byte("AWS4"+secretKey), date), region), serviceName), "aws4_request")
signature := hex.EncodeToString(testHMAC(signingKey, stringToSign))
req.Header.Set("Authorization", "AWS4-HMAC-SHA256 "+
"Credential="+accessKeyID+"/"+scope+", "+
"SignedHeaders="+signedHeadersRaw+", "+
"Signature="+signature)
}
func canonicalTestQuery(rawQuery string) string {
values, _ := url.ParseQuery(rawQuery)
pairs := make([]string, 0)
for key, valueList := range values {
if len(valueList) == 0 {
pairs = append(pairs, awsTestQueryEscape(key)+"=")
continue
}
for _, value := range valueList {
pairs = append(pairs, awsTestQueryEscape(key)+"="+awsTestQueryEscape(value))
}
}
sort.Strings(pairs)
return strings.Join(pairs, "&")
}
func awsTestQueryEscape(value string) string {
encoded := url.QueryEscape(value)
encoded = strings.ReplaceAll(encoded, "+", "%20")
encoded = strings.ReplaceAll(encoded, "*", "%2A")
encoded = strings.ReplaceAll(encoded, "%7E", "~")
return encoded
}
func testHMAC(key []byte, value string) []byte {
mac := hmac.New(sha256.New, key)
_, _ = mac.Write([]byte(value))
return mac.Sum(nil)
}

107
api/object_copy_test.go Normal file
View File

@@ -0,0 +1,107 @@
package api
import (
"bytes"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"
"fs/logging"
"fs/metadata"
"fs/service"
"fs/storage"
)
func newTestObjectHandler(t *testing.T) (*Handler, *service.ObjectService) {
t.Helper()
root := t.TempDir()
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
if err != nil {
t.Fatalf("new metadata handler: %v", err)
}
blob, err := storage.NewBlobStore(root, 1024)
if err != nil {
t.Fatalf("new blob store: %v", err)
}
svc := service.NewObjectService(md, blob, time.Hour)
t.Cleanup(func() {
_ = svc.Close()
})
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
handler := NewHandler(svc, logger, logging.Config{}, nil, false)
handler.setupRoutes()
return handler, svc
}
func TestPutObjectStoresDecodedKey(t *testing.T) {
handler, svc := newTestObjectHandler(t)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("create bucket: %v", err)
}
req := httptest.NewRequest(http.MethodPut, "/test-bucket/jsp-data-raw/vehicle_positions/year%3D2026/month%3D03/day%3D12/file.parquet", bytes.NewReader([]byte("PAR1data")))
rec := httptest.NewRecorder()
handler.router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("unexpected status: got %d body=%s", rec.Code, rec.Body.String())
}
_, err := svc.HeadObject("test-bucket", "jsp-data-raw/vehicle_positions/year=2026/month=03/day=12/file.parquet")
if err != nil {
t.Fatalf("head decoded key: %v", err)
}
getReq := httptest.NewRequest(http.MethodGet, "/test-bucket/jsp-data-raw/vehicle_positions/year=2026/month=03/day=12/file.parquet", nil)
getRec := httptest.NewRecorder()
handler.router.ServeHTTP(getRec, getReq)
if getRec.Code != http.StatusOK {
t.Fatalf("unexpected get status: got %d body=%s", getRec.Code, getRec.Body.String())
}
if got := getRec.Body.String(); got != "PAR1data" {
t.Fatalf("unexpected get body: got %q", got)
}
}
func TestCopyObjectCopiesCanonicalObject(t *testing.T) {
handler, svc := newTestObjectHandler(t)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("create bucket: %v", err)
}
putReq := httptest.NewRequest(http.MethodPut, "/test-bucket/source/year%3D2026/file.parquet", bytes.NewReader([]byte("PAR1copy")))
putRec := httptest.NewRecorder()
handler.router.ServeHTTP(putRec, putReq)
if putRec.Code != http.StatusOK {
t.Fatalf("unexpected put status: got %d body=%s", putRec.Code, putRec.Body.String())
}
copyReq := httptest.NewRequest(http.MethodPut, "/test-bucket/copied/year=2026/file.parquet", http.NoBody)
copyReq.Header.Set("x-amz-copy-source", "/test-bucket/source/year%3D2026/file.parquet")
copyRec := httptest.NewRecorder()
handler.router.ServeHTTP(copyRec, copyReq)
if copyRec.Code != http.StatusOK {
t.Fatalf("unexpected copy status: got %d body=%s", copyRec.Code, copyRec.Body.String())
}
if !strings.Contains(copyRec.Body.String(), "<CopyObjectResult") {
t.Fatalf("unexpected copy response body: %s", copyRec.Body.String())
}
getReq := httptest.NewRequest(http.MethodGet, "/test-bucket/copied/year=2026/file.parquet", nil)
getRec := httptest.NewRecorder()
handler.router.ServeHTTP(getRec, getReq)
if getRec.Code != http.StatusOK {
t.Fatalf("unexpected get status after copy: got %d body=%s", getRec.Code, getRec.Body.String())
}
if got := getRec.Body.String(); got != "PAR1copy" {
t.Fatalf("unexpected copied body: got %q", got)
}
}

View File

@@ -174,6 +174,8 @@ func mapToS3Error(err error) s3APIError {
return s3ErrMalformedXML return s3ErrMalformedXML
case errors.Is(err, service.ErrEntityTooSmall): case errors.Is(err, service.ErrEntityTooSmall):
return s3ErrEntityTooSmall return s3ErrEntityTooSmall
case errors.Is(err, service.ErrEntityTooLarge):
return s3ErrEntityTooLarge
case errors.Is(err, auth.ErrAccessDenied): case errors.Is(err, auth.ErrAccessDenied):
return s3ErrAccessDenied return s3ErrAccessDenied
case errors.Is(err, auth.ErrInvalidAccessKeyID): case errors.Is(err, auth.ErrInvalidAccessKeyID):

79
api/upload_limit_test.go Normal file
View File

@@ -0,0 +1,79 @@
package api
import (
"bytes"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"
"fs/logging"
"fs/metadata"
"fs/service"
"fs/storage"
)
func TestPutObjectReturnsEntityTooLarge(t *testing.T) {
handler, svc := newUploadLimitHandler(t, 4)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
req := httptest.NewRequest(http.MethodPut, "/test-bucket/too-large.txt", strings.NewReader("12345"))
rec := httptest.NewRecorder()
handler.router.ServeHTTP(rec, req)
if rec.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("status = %d, want %d body=%s", rec.Code, http.StatusRequestEntityTooLarge, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "EntityTooLarge") {
t.Fatalf("expected EntityTooLarge response, body=%s", rec.Body.String())
}
}
func TestUploadPartReturnsEntityTooLarge(t *testing.T) {
handler, svc := newUploadLimitHandler(t, 4)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
if err != nil {
t.Fatalf("CreateMultipartUpload: %v", err)
}
req := httptest.NewRequest(http.MethodPut, "/test-bucket/object.txt?partNumber=1&uploadId="+upload.UploadID, bytes.NewReader([]byte("12345")))
rec := httptest.NewRecorder()
handler.router.ServeHTTP(rec, req)
if rec.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("status = %d, want %d body=%s", rec.Code, http.StatusRequestEntityTooLarge, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "EntityTooLarge") {
t.Fatalf("expected EntityTooLarge response, body=%s", rec.Body.String())
}
}
func newUploadLimitHandler(t *testing.T, maxUploadSize int64) (*Handler, *service.ObjectService) {
t.Helper()
root := t.TempDir()
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
if err != nil {
t.Fatalf("new metadata handler: %v", err)
}
blob, err := storage.NewBlobStore(root, 4)
if err != nil {
t.Fatalf("new blob store: %v", err)
}
svc := service.NewObjectService(md, blob, time.Hour, maxUploadSize)
t.Cleanup(func() {
_ = svc.Close()
})
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
handler := NewHandler(svc, logger, logging.Config{}, nil, false)
handler.setupRoutes()
return handler, svc
}

View File

@@ -39,6 +39,7 @@ func RunServer(ctx context.Context) error {
"audit_log", logConfig.Audit, "audit_log", logConfig.Audit,
"data_path", config.DataPath, "data_path", config.DataPath,
"multipart_retention_hours", int(config.MultipartCleanupRetention/time.Hour), "multipart_retention_hours", int(config.MultipartCleanupRetention/time.Hour),
"max_object_upload_bytes", config.MaxObjectUploadBytes,
"auth_enabled", authConfig.Enabled, "auth_enabled", authConfig.Enabled,
"auth_region", authConfig.Region, "auth_region", authConfig.Region,
"admin_api_enabled", config.AdminAPIEnabled, "admin_api_enabled", config.AdminAPIEnabled,
@@ -63,7 +64,7 @@ func RunServer(ctx context.Context) error {
return err return err
} }
objectService := service.NewObjectService(metadataHandler, blobHandler, config.MultipartCleanupRetention) objectService := service.NewObjectService(metadataHandler, blobHandler, config.MultipartCleanupRetention, config.MaxObjectUploadBytes)
authService, err := auth.NewService(authConfig, metadataHandler) authService, err := auth.NewService(authConfig, metadataHandler)
if err != nil { if err != nil {
_ = metadataHandler.Close() _ = metadataHandler.Close()

View File

@@ -94,9 +94,11 @@ For each non-health request:
6. Decrypt stored secret using master key. 6. Decrypt stored secret using master key.
7. Recompute canonical request and expected signature. 7. Recompute canonical request and expected signature.
8. Compare signatures. 8. Compare signatures.
9. Resolve target action from request. 9. Reject signed streaming payload modes that require per-chunk signature verification.
10. Evaluate policy; deny overrides allow. 10. Wrap fixed-size signed payloads so the actual body must match `x-amz-content-sha256`.
11. Store auth result in request context and continue. 11. Resolve target action from request.
12. Evaluate policy; deny overrides allow.
13. Store auth result in request context and continue.
## Authorization Semantics ## Authorization Semantics
Policy evaluator rules: Policy evaluator rules:
@@ -106,6 +108,9 @@ Policy evaluator rules:
- action: `*` or `s3:*` - action: `*` or `s3:*`
- bucket: `*` - bucket: `*`
- prefix: `*` - prefix: `*`
- Object actions apply `prefix` to the object key.
- `ListBucket` applies `prefix` to the requested list `prefix` query value; a scoped list policy such as `prefix=backups/` does not allow an empty-prefix or sibling-prefix bucket listing.
- Multi-object delete is authorized per object key after the XML body is parsed; denied keys are returned as per-key `AccessDenied` errors and are not deleted.
Action resolution includes: Action resolution includes:
- bucket APIs (`CreateBucket`, `ListBucket`, `HeadBucket`, `DeleteBucket`) - bucket APIs (`CreateBucket`, `ListBucket`, `HeadBucket`, `DeleteBucket`)
@@ -137,6 +142,7 @@ Each audit entry includes method, path, remote IP, and request ID (if present).
## Current Scope / Limitations ## Current Scope / Limitations
- No STS/session-token auth yet. - No STS/session-token auth yet.
- Signed aws-chunked streaming payloads are not accepted until per-chunk signature verification is implemented. Unsigned streaming payload modes can still be decoded by the API layer.
- Policy language is intentionally minimal, not full IAM. - Policy language is intentionally minimal, not full IAM.
- No automatic key rotation workflows. - No automatic key rotation workflows.
- No key rotation endpoint for existing users yet. - No key rotation endpoint for existing users yet.

View File

@@ -27,6 +27,18 @@ type RequestTarget struct {
Action Action Action Action
Bucket string Bucket string
Key string Key string
Prefix string
}
func RequiresHandlerAuthorization(r *http.Request) bool {
if r == nil || r.URL == nil {
return false
}
if r.Method == http.MethodPost {
_, isDelete := r.URL.Query()["delete"]
return isDelete
}
return false
} }
func resolveTarget(r *http.Request) RequestTarget { func resolveTarget(r *http.Request) RequestTarget {
@@ -51,7 +63,7 @@ func resolveTarget(r *http.Request) RequestTarget {
case http.MethodDelete: case http.MethodDelete:
return RequestTarget{Action: ActionDeleteBucket, Bucket: bucket} return RequestTarget{Action: ActionDeleteBucket, Bucket: bucket}
case http.MethodGet: case http.MethodGet:
return RequestTarget{Action: ActionListBucket, Bucket: bucket} return RequestTarget{Action: ActionListBucket, Bucket: bucket, Prefix: r.URL.Query().Get("prefix")}
case http.MethodPost: case http.MethodPost:
if _, ok := r.URL.Query()["delete"]; ok { if _, ok := r.URL.Query()["delete"]; ok {
return RequestTarget{Action: ActionDeleteObject, Bucket: bucket} return RequestTarget{Action: ActionDeleteObject, Bucket: bucket}

39
auth/action_test.go Normal file
View File

@@ -0,0 +1,39 @@
package auth
import (
"net/http"
"net/http/httptest"
"testing"
)
func TestResolveTargetIncludesListBucketPrefix(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://example.com/test-bucket?list-type=2&prefix=allowed/", nil)
target := resolveTarget(req)
if target.Action != ActionListBucket {
t.Fatalf("action = %q, want %q", target.Action, ActionListBucket)
}
if target.Bucket != "test-bucket" {
t.Fatalf("bucket = %q, want test-bucket", target.Bucket)
}
if target.Prefix != "allowed/" {
t.Fatalf("prefix = %q, want allowed/", target.Prefix)
}
if target.Key != "" {
t.Fatalf("key = %q, want empty", target.Key)
}
}
func TestResolveTargetListBucketWithoutPrefix(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://example.com/test-bucket", nil)
target := resolveTarget(req)
if target.Action != ActionListBucket {
t.Fatalf("action = %q, want %q", target.Action, ActionListBucket)
}
if target.Prefix != "" {
t.Fatalf("prefix = %q, want empty", target.Prefix)
}
}

View File

@@ -1,11 +1,16 @@
package auth package auth
import ( import (
"crypto/sha256"
"encoding/hex"
"errors" "errors"
"fs/metrics" "fs/metrics"
"hash"
"io"
"log/slog" "log/slog"
"net" "net"
"net/http" "net/http"
"strings"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
) )
@@ -55,6 +60,16 @@ func Middleware(
return return
} }
if err := wrapPayloadHashVerifier(r); err != nil {
metrics.Default.ObserveAuth("error", "sigv4", authErrorClass(err))
if onError != nil {
onError(w, r, err)
return
}
http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
return
}
metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none") metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none")
if auditEnabled && logger != nil { if auditEnabled && logger != nil {
requestID := middleware.GetReqID(r.Context()) requestID := middleware.GetReqID(r.Context())
@@ -75,6 +90,65 @@ func Middleware(
} }
} }
func wrapPayloadHashVerifier(r *http.Request) error {
if r == nil || r.Body == nil || r.Body == http.NoBody {
return nil
}
payloadHash := resolvePayloadHash(r, false)
if !payloadHashRequiresVerification(payloadHash) {
return nil
}
if !isHexSHA256(payloadHash) {
return ErrAuthorizationHeaderMalformed
}
expected, err := hex.DecodeString(strings.ToLower(payloadHash))
if err != nil {
return ErrAuthorizationHeaderMalformed
}
r.Body = &payloadHashVerifyingReadCloser{
inner: r.Body,
hasher: sha256.New(),
expected: expected,
}
return nil
}
type payloadHashVerifyingReadCloser struct {
inner io.ReadCloser
hasher hash.Hash
expected []byte
done bool
}
func (r *payloadHashVerifyingReadCloser) Read(p []byte) (int, error) {
n, err := r.inner.Read(p)
if n > 0 {
_, _ = r.hasher.Write(p[:n])
}
if err == io.EOF && !r.done {
r.done = true
if !equalBytes(r.hasher.Sum(nil), r.expected) {
return n, ErrSignatureDoesNotMatch
}
}
return n, err
}
func (r *payloadHashVerifyingReadCloser) Close() error {
return r.inner.Close()
}
func equalBytes(left, right []byte) bool {
if len(left) != len(right) {
return false
}
var diff byte
for i := range left {
diff |= left[i] ^ right[i]
}
return diff == 0
}
func authErrorClass(err error) string { func authErrorClass(err error) string {
switch { switch {
case errors.Is(err, ErrInvalidAccessKeyID): case errors.Is(err, ErrInvalidAccessKeyID):

75
auth/payload_hash_test.go Normal file
View File

@@ -0,0 +1,75 @@
package auth
import (
"crypto/sha256"
"encoding/hex"
"errors"
"io"
"net/http"
"strings"
"testing"
)
func TestPayloadHashVerifierAllowsMatchingBody(t *testing.T) {
body := "payload"
req := newPayloadHashRequest(t, body, body)
if err := wrapPayloadHashVerifier(req); err != nil {
t.Fatalf("wrapPayloadHashVerifier returned error: %v", err)
}
got, err := io.ReadAll(req.Body)
if err != nil {
t.Fatalf("ReadAll returned error: %v", err)
}
if string(got) != body {
t.Fatalf("unexpected body: got %q want %q", string(got), body)
}
}
func TestPayloadHashVerifierRejectsMismatchedBody(t *testing.T) {
req := newPayloadHashRequest(t, "signed-payload", "actual-payload")
if err := wrapPayloadHashVerifier(req); err != nil {
t.Fatalf("wrapPayloadHashVerifier returned error: %v", err)
}
_, err := io.ReadAll(req.Body)
if !errors.Is(err, ErrSignatureDoesNotMatch) {
t.Fatalf("ReadAll error = %v, want ErrSignatureDoesNotMatch", err)
}
}
func TestPayloadSigningRejectsSignedStreamingMode(t *testing.T) {
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
err = validatePayloadSigningMode(req, &sigV4Input{})
if !errors.Is(err, ErrAuthorizationHeaderMalformed) {
t.Fatalf("validatePayloadSigningMode error = %v, want ErrAuthorizationHeaderMalformed", err)
}
}
func TestPayloadSigningAllowsUnsignedStreamingMode(t *testing.T) {
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("x-amz-content-sha256", "STREAMING-UNSIGNED-PAYLOAD-TRAILER")
if err := validatePayloadSigningMode(req, &sigV4Input{}); err != nil {
t.Fatalf("validatePayloadSigningMode returned error: %v", err)
}
}
func newPayloadHashRequest(t *testing.T, signedBody, actualBody string) *http.Request {
t.Helper()
req, err := http.NewRequest(http.MethodPut, "http://example.com/b/k", strings.NewReader(actualBody))
if err != nil {
t.Fatal(err)
}
sum := sha256.Sum256([]byte(signedBody))
req.Header.Set("x-amz-content-sha256", hex.EncodeToString(sum[:]))
return req
}

View File

@@ -33,14 +33,16 @@ func statementMatches(stmt models.AuthPolicyStatement, target RequestTarget) boo
if !bucketMatches(stmt.Bucket, target.Bucket) { if !bucketMatches(stmt.Bucket, target.Bucket) {
return false return false
} }
if target.Key == "" {
return true
}
prefix := strings.TrimSpace(stmt.Prefix) prefix := strings.TrimSpace(stmt.Prefix)
if prefix == "" || prefix == "*" { if prefix == "" || prefix == "*" {
return true return true
} }
if target.Key == "" {
if target.Action == ActionListBucket {
return strings.HasPrefix(target.Prefix, prefix)
}
return true
}
return strings.HasPrefix(target.Key, prefix) return strings.HasPrefix(target.Key, prefix)
} }

52
auth/policy_test.go Normal file
View File

@@ -0,0 +1,52 @@
package auth
import (
"fs/models"
"testing"
)
func TestListBucketPolicyAppliesPrefix(t *testing.T) {
policy := &models.AuthPolicy{
Statements: []models.AuthPolicyStatement{
{
Effect: "allow",
Actions: []string{"s3:ListBucket"},
Bucket: "test-bucket",
Prefix: "allowed/",
},
},
}
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "allowed/"}) {
t.Fatalf("expected matching list prefix to be allowed")
}
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "allowed/nested/"}) {
t.Fatalf("expected nested list prefix to be allowed")
}
if isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket"}) {
t.Fatalf("expected empty list prefix to be denied")
}
if isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "private/"}) {
t.Fatalf("expected non-matching list prefix to be denied")
}
}
func TestWildcardListBucketPolicyAllowsAnyPrefix(t *testing.T) {
policy := &models.AuthPolicy{
Statements: []models.AuthPolicyStatement{
{
Effect: "allow",
Actions: []string{"s3:ListBucket"},
Bucket: "test-bucket",
Prefix: "*",
},
},
}
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket"}) {
t.Fatalf("expected wildcard list policy to allow empty prefix")
}
if !isAllowed(policy, RequestTarget{Action: ActionListBucket, Bucket: "test-bucket", Prefix: "private/"}) {
t.Fatalf("expected wildcard list policy to allow arbitrary prefix")
}
}

View File

@@ -152,6 +152,9 @@ func (s *Service) AuthenticateRequest(r *http.Request) (RequestContext, error) {
if err := validateSigV4Input(s.now(), s.cfg, input); err != nil { if err := validateSigV4Input(s.now(), s.cfg, input); err != nil {
return RequestContext{}, err return RequestContext{}, err
} }
if err := validatePayloadSigningMode(r, input); err != nil {
return RequestContext{}, err
}
identity, err := s.store.GetAuthIdentity(input.AccessKeyID) identity, err := s.store.GetAuthIdentity(input.AccessKeyID)
if err != nil { if err != nil {
@@ -185,6 +188,13 @@ func (s *Service) AuthenticateRequest(r *http.Request) (RequestContext, error) {
AuthType: authType, AuthType: authType,
}, nil }, nil
} }
if RequiresHandlerAuthorization(r) {
return RequestContext{
Authenticated: true,
AccessKeyID: identity.AccessKeyID,
AuthType: authType,
}, nil
}
policy, err := s.store.GetAuthPolicy(identity.AccessKeyID) policy, err := s.store.GetAuthPolicy(identity.AccessKeyID)
if err != nil { if err != nil {

View File

@@ -210,6 +210,17 @@ func validateSigV4Input(now time.Time, cfg Config, input *sigV4Input) error {
return nil return nil
} }
func validatePayloadSigningMode(r *http.Request, input *sigV4Input) error {
payloadHash := resolvePayloadHash(r, input.Presigned)
if isSignedStreamingPayloadHash(payloadHash) {
return fmt.Errorf("%w: signed streaming payload verification is not supported", ErrAuthorizationHeaderMalformed)
}
if payloadHashRequiresVerification(payloadHash) && !isHexSHA256(payloadHash) {
return fmt.Errorf("%w: invalid x-amz-content-sha256", ErrAuthorizationHeaderMalformed)
}
return nil
}
func signatureMatches(secret string, r *http.Request, input *sigV4Input) (bool, error) { func signatureMatches(secret string, r *http.Request, input *sigV4Input) (bool, error) {
payloadHash := resolvePayloadHash(r, input.Presigned) payloadHash := resolvePayloadHash(r, input.Presigned)
canonicalRequest, err := buildCanonicalRequest(r, input.SignedHeaders, payloadHash, input.Presigned) canonicalRequest, err := buildCanonicalRequest(r, input.SignedHeaders, payloadHash, input.Presigned)
@@ -233,6 +244,34 @@ func resolvePayloadHash(r *http.Request, presigned bool) string {
return hash return hash
} }
func isSignedStreamingPayloadHash(payloadHash string) bool {
payloadHash = strings.ToUpper(strings.TrimSpace(payloadHash))
return strings.HasPrefix(payloadHash, "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
}
func payloadHashRequiresVerification(payloadHash string) bool {
payloadHash = strings.ToUpper(strings.TrimSpace(payloadHash))
if payloadHash == "" || payloadHash == "UNSIGNED-PAYLOAD" {
return false
}
if strings.HasPrefix(payloadHash, "STREAMING-UNSIGNED-PAYLOAD") {
return false
}
return true
}
func isHexSHA256(value string) bool {
if len(value) != sha256.Size*2 {
return false
}
for _, ch := range value {
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') && (ch < 'A' || ch > 'F') {
return false
}
}
return true
}
func buildCanonicalRequest(r *http.Request, signedHeaders []string, payloadHash string, presigned bool) (string, error) { func buildCanonicalRequest(r *http.Request, signedHeaders []string, payloadHash string, presigned bool) (string, error) {
canonicalURI := canonicalPath(r.URL) canonicalURI := canonicalPath(r.URL)
canonicalQuery := canonicalQueryString(r.URL.RawQuery, presigned) canonicalQuery := canonicalQueryString(r.URL.RawQuery, presigned)

50
auth/sigv4_test.go Normal file
View File

@@ -0,0 +1,50 @@
package auth
import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)
func TestCanonicalPathEncodesEquals(t *testing.T) {
u := &url.URL{Path: "/test-bucket/jsp-data-raw/year=2026/month=03/day=12/vehicle_positions.parquet"}
got := canonicalPath(u)
want := "/test-bucket/jsp-data-raw/year%3D2026/month%3D03/day%3D12/vehicle_positions.parquet"
if got != want {
t.Fatalf("unexpected canonical path: got %q want %q", got, want)
}
}
func TestCanonicalPathPreservesExistingEscapes(t *testing.T) {
u, err := url.Parse("http://localhost:2600/test-bucket/jsp-data-raw/year%3d2026/file%2Eparquet")
if err != nil {
t.Fatalf("url.Parse failed: %v", err)
}
got := canonicalPath(u)
want := "/test-bucket/jsp-data-raw/year%3D2026/file%2Eparquet"
if got != want {
t.Fatalf("unexpected canonical path: got %q want %q", got, want)
}
}
func TestBuildCanonicalRequestUsesAwsEncodedPath(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://localhost:2600/test-bucket/jsp-data-raw/year=2026/month=03/day=12/vehicle_positions.parquet", nil)
req.Header.Set("x-amz-date", "20260313T120000Z")
req.Header.Set("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
canonical, err := buildCanonicalRequest(req, []string{"host", "x-amz-content-sha256", "x-amz-date"}, "UNSIGNED-PAYLOAD", false)
if err != nil {
t.Fatalf("buildCanonicalRequest failed: %v", err)
}
lines := strings.Split(canonical, "\n")
if len(lines) < 2 {
t.Fatalf("canonical request has unexpected format: %q", canonical)
}
wantPath := "/test-bucket/jsp-data-raw/year%3D2026/month%3D03/day%3D12/vehicle_positions.parquet"
if lines[1] != wantPath {
t.Fatalf("unexpected canonical path line: got %q want %q", lines[1], wantPath)
}
}

View File

@@ -32,12 +32,15 @@ This project is S3-compatible for a focused subset of operations.
### Authentication ### Authentication
- AWS SigV4 header auth - AWS SigV4 header auth
- AWS SigV4 presigned query auth - AWS SigV4 presigned query auth
- `aws-chunked` payload decode for streaming uploads - `aws-chunked` payload decode for unsigned streaming upload modes
- SigV4 payload hash verification for fixed-size signed payloads
## Partially Implemented / Differences ## Partially Implemented / Differences
- Exact parity with AWS S3 error codes/headers is still evolving. - Exact parity with AWS S3 error codes/headers is still evolving.
- Some S3 edge-case behaviors may differ (especially uncommon query/header combinations). - Some S3 edge-case behaviors may differ (especially uncommon query/header combinations).
- Admin API is custom JSON (`/_admin/v1/*`). - Admin API is custom JSON (`/_admin/v1/*`).
- Object and upload-part payloads are limited by `FS_MAX_OBJECT_UPLOAD_BYTES` (default 5 GiB).
- Signed `aws-chunked` payload modes that require per-chunk signature verification are rejected until chunk-signature validation is implemented.
## Not Implemented (Current) ## Not Implemented (Current)
- Bucket versioning - Bucket versioning

View File

@@ -902,9 +902,6 @@ func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int,
if err := json.Unmarshal(v, &upload); err != nil { if err := json.Unmarshal(v, &upload); err != nil {
return err return err
} }
if upload.State == "pending" {
return nil
}
createdAt, err := time.Parse(time.RFC3339, upload.CreatedAt) createdAt, err := time.Parse(time.RFC3339, upload.CreatedAt)
if err != nil { if err != nil {
return nil return nil

99
metadata/metadata_test.go Normal file
View File

@@ -0,0 +1,99 @@
package metadata
import (
"errors"
"fs/models"
"path/filepath"
"testing"
"time"
"go.etcd.io/bbolt"
)
func TestCleanupMultipartUploadsDeletesExpiredPendingUpload(t *testing.T) {
h := newTestMetadataHandler(t)
if err := h.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
upload, err := h.CreateMultipartUpload("test-bucket", "object.txt")
if err != nil {
t.Fatalf("CreateMultipartUpload: %v", err)
}
if err := h.PutMultipartPart(upload.UploadID, models.UploadedPart{PartNumber: 1, ETag: "etag", Size: 4, Chunks: []string{"chunk-id"}}); err != nil {
t.Fatalf("PutMultipartPart: %v", err)
}
setMultipartUploadCreatedAt(t, h, upload.UploadID, time.Now().Add(-2*time.Hour))
cleaned, err := h.CleanupMultipartUploads(time.Hour)
if err != nil {
t.Fatalf("CleanupMultipartUploads: %v", err)
}
if cleaned != 1 {
t.Fatalf("cleaned = %d, want 1", cleaned)
}
if _, err := h.GetMultipartUpload(upload.UploadID); !errors.Is(err, ErrMultipartNotFound) {
t.Fatalf("GetMultipartUpload error = %v, want ErrMultipartNotFound", err)
}
if _, err := h.ListMultipartParts(upload.UploadID); !errors.Is(err, ErrMultipartNotFound) {
t.Fatalf("ListMultipartParts error = %v, want ErrMultipartNotFound", err)
}
}
func TestCleanupMultipartUploadsKeepsRecentPendingUpload(t *testing.T) {
h := newTestMetadataHandler(t)
if err := h.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
upload, err := h.CreateMultipartUpload("test-bucket", "object.txt")
if err != nil {
t.Fatalf("CreateMultipartUpload: %v", err)
}
cleaned, err := h.CleanupMultipartUploads(time.Hour)
if err != nil {
t.Fatalf("CleanupMultipartUploads: %v", err)
}
if cleaned != 0 {
t.Fatalf("cleaned = %d, want 0", cleaned)
}
if _, err := h.GetMultipartUpload(upload.UploadID); err != nil {
t.Fatalf("recent upload should remain: %v", err)
}
}
func TestCleanupMultipartUploadsDisabledForNonPositiveRetention(t *testing.T) {
h := newTestMetadataHandler(t)
cleaned, err := h.CleanupMultipartUploads(0)
if err != nil {
t.Fatalf("CleanupMultipartUploads: %v", err)
}
if cleaned != 0 {
t.Fatalf("cleaned = %d, want 0", cleaned)
}
}
func newTestMetadataHandler(t *testing.T) *MetadataHandler {
t.Helper()
h, err := NewMetadataHandler(filepath.Join(t.TempDir(), "metadata.db"))
if err != nil {
t.Fatalf("NewMetadataHandler: %v", err)
}
t.Cleanup(func() {
_ = h.Close()
})
return h
}
func setMultipartUploadCreatedAt(t *testing.T, h *MetadataHandler, uploadID string, createdAt time.Time) {
t.Helper()
if err := h.update(func(tx *bbolt.Tx) error {
upload, uploadsBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
upload.CreatedAt = createdAt.UTC().Format(time.RFC3339)
return putMultipartUpload(uploadsBucket, uploadID, upload)
}); err != nil {
t.Fatalf("set multipart created_at: %v", err)
}
}

View File

@@ -21,6 +21,7 @@ type ObjectService struct {
metadata *metadata.MetadataHandler metadata *metadata.MetadataHandler
blob *storage.BlobStore blob *storage.BlobStore
multipartRetention time.Duration multipartRetention time.Duration
maxUploadSize int64
gcMu sync.RWMutex gcMu sync.RWMutex
} }
@@ -29,16 +30,24 @@ var (
ErrInvalidPartOrder = errors.New("invalid multipart part order") ErrInvalidPartOrder = errors.New("invalid multipart part order")
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request") ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
ErrEntityTooSmall = errors.New("multipart entity too small") ErrEntityTooSmall = errors.New("multipart entity too small")
ErrEntityTooLarge = errors.New("entity too large")
) )
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration) *ObjectService { const DefaultMaxUploadSize int64 = 5 * 1024 * 1024 * 1024
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration, maxUploadSize ...int64) *ObjectService {
if multipartRetention <= 0 { if multipartRetention <= 0 {
multipartRetention = 24 * time.Hour multipartRetention = 24 * time.Hour
} }
limit := DefaultMaxUploadSize
if len(maxUploadSize) > 0 {
limit = maxUploadSize[0]
}
return &ObjectService{ return &ObjectService{
metadata: metadataHandler, metadata: metadataHandler,
blob: blobHandler, blob: blobHandler,
multipartRetention: multipartRetention, multipartRetention: multipartRetention,
maxUploadSize: limit,
} }
} }
@@ -74,7 +83,7 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
unlock := s.acquireGCRLock() unlock := s.acquireGCRLock()
defer unlock() defer unlock()
chunks, size, etag, err := s.blob.IngestStream(input) chunks, size, etag, err := s.blob.IngestStream(s.limitUpload(input))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -158,7 +167,9 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
defer func() { defer func() {
metrics.Default.ObserveService("get_object", time.Since(start), streamOK) metrics.Default.ObserveService("get_object", time.Since(start), streamOK)
}() }()
defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart)) defer func() {
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
}()
defer s.gcMu.RUnlock() defer s.gcMu.RUnlock()
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil { if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
_ = pw.CloseWithError(err) _ = pw.CloseWithError(err)
@@ -311,7 +322,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
} }
var uploadedPart models.UploadedPart var uploadedPart models.UploadedPart
chunkIds, totalSize, etag, err := s.blob.IngestStream(input) chunkIds, totalSize, etag, err := s.blob.IngestStream(s.limitUpload(input))
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -400,6 +411,9 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
orderedParts = append(orderedParts, storedPart) orderedParts = append(orderedParts, storedPart)
chunks = append(chunks, storedPart.Chunks...) chunks = append(chunks, storedPart.Chunks...)
totalSize += storedPart.Size totalSize += storedPart.Size
if s.maxUploadSize > 0 && totalSize > s.maxUploadSize {
return nil, ErrEntityTooLarge
}
} }
finalETag := buildMultipartETag(orderedParts) finalETag := buildMultipartETag(orderedParts)
@@ -435,6 +449,40 @@ func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error
return s.metadata.AbortMultipartUpload(uploadID) return s.metadata.AbortMultipartUpload(uploadID)
} }
func (s *ObjectService) limitUpload(input io.Reader) io.Reader {
if s.maxUploadSize <= 0 || input == nil {
return input
}
return &maxBytesReader{inner: input, remaining: s.maxUploadSize}
}
type maxBytesReader struct {
inner io.Reader
remaining int64
tooLarge bool
}
func (r *maxBytesReader) Read(p []byte) (int, error) {
if r.tooLarge {
return 0, ErrEntityTooLarge
}
if r.remaining <= 0 {
var probe [1]byte
n, err := r.inner.Read(probe[:])
if n > 0 {
r.tooLarge = true
return 0, ErrEntityTooLarge
}
return 0, err
}
if int64(len(p)) > r.remaining {
p = p[:r.remaining]
}
n, err := r.inner.Read(p)
r.remaining -= int64(n)
return n, err
}
func normalizeETag(etag string) string { func normalizeETag(etag string) string {
return strings.Trim(etag, "\"") return strings.Trim(etag, "\"")
} }
@@ -469,6 +517,12 @@ func (s *ObjectService) GarbageCollect() error {
unlock := s.acquireGCLock() unlock := s.acquireGCLock()
defer unlock() defer unlock()
var err error
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
if err != nil {
return err
}
referencedChunkSet, err := s.metadata.GetReferencedChunkSet() referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
if err != nil { if err != nil {
return err return err
@@ -492,11 +546,6 @@ func (s *ObjectService) GarbageCollect() error {
return err return err
} }
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
if err != nil {
return err
}
slog.Info("garbage_collect_completed", slog.Info("garbage_collect_completed",
"referenced_chunks", len(referencedChunkSet), "referenced_chunks", len(referencedChunkSet),
"total_chunks", totalChunks, "total_chunks", totalChunks,

View File

@@ -0,0 +1,119 @@
package service
import (
"errors"
"fs/metadata"
"fs/storage"
"path/filepath"
"strings"
"testing"
"time"
)
func TestPutObjectRejectsOversizedUpload(t *testing.T) {
svc := newTestObjectService(t, 4)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
_, err := svc.PutObject("test-bucket", "too-large.txt", "text/plain", strings.NewReader("12345"))
if !errors.Is(err, ErrEntityTooLarge) {
t.Fatalf("PutObject error = %v, want ErrEntityTooLarge", err)
}
if _, err := svc.HeadObject("test-bucket", "too-large.txt"); !errors.Is(err, metadata.ErrObjectNotFound) {
t.Fatalf("HeadObject error = %v, want ErrObjectNotFound", err)
}
}
func TestPutObjectAllowsExactUploadLimit(t *testing.T) {
svc := newTestObjectService(t, 4)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
manifest, err := svc.PutObject("test-bucket", "exact.txt", "text/plain", strings.NewReader("1234"))
if err != nil {
t.Fatalf("PutObject: %v", err)
}
if manifest.Size != 4 {
t.Fatalf("manifest size = %d, want 4", manifest.Size)
}
}
func TestUploadPartRejectsOversizedUpload(t *testing.T) {
svc := newTestObjectService(t, 4)
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
if err != nil {
t.Fatalf("CreateMultipartUpload: %v", err)
}
_, err = svc.UploadPart("test-bucket", "object.txt", upload.UploadID, 1, strings.NewReader("12345"))
if !errors.Is(err, ErrEntityTooLarge) {
t.Fatalf("UploadPart error = %v, want ErrEntityTooLarge", err)
}
parts, err := svc.ListMultipartParts("test-bucket", "object.txt", upload.UploadID)
if err != nil {
t.Fatalf("ListMultipartParts: %v", err)
}
if len(parts) != 0 {
t.Fatalf("stored parts = %d, want 0", len(parts))
}
}
func TestGarbageCollectRemovesExpiredPendingMultipartChunks(t *testing.T) {
svc := newTestObjectService(t, 1024)
svc.multipartRetention = time.Nanosecond
if err := svc.CreateBucket("test-bucket"); err != nil {
t.Fatalf("CreateBucket: %v", err)
}
upload, err := svc.CreateMultipartUpload("test-bucket", "object.txt")
if err != nil {
t.Fatalf("CreateMultipartUpload: %v", err)
}
if _, err := svc.UploadPart("test-bucket", "object.txt", upload.UploadID, 1, strings.NewReader("part-data")); err != nil {
t.Fatalf("UploadPart: %v", err)
}
chunks, err := svc.blob.ListChunks()
if err != nil {
t.Fatalf("ListChunks before GC: %v", err)
}
if len(chunks) == 0 {
t.Fatalf("expected uploaded part chunks")
}
time.Sleep(time.Millisecond)
if err := svc.GarbageCollect(); err != nil {
t.Fatalf("GarbageCollect: %v", err)
}
if _, err := svc.metadata.GetMultipartUpload(upload.UploadID); !errors.Is(err, metadata.ErrMultipartNotFound) {
t.Fatalf("GetMultipartUpload error = %v, want ErrMultipartNotFound", err)
}
chunks, err = svc.blob.ListChunks()
if err != nil {
t.Fatalf("ListChunks after GC: %v", err)
}
if len(chunks) != 0 {
t.Fatalf("chunks after GC = %d, want 0", len(chunks))
}
}
func newTestObjectService(t *testing.T, maxUploadSize int64) *ObjectService {
t.Helper()
root := t.TempDir()
md, err := metadata.NewMetadataHandler(filepath.Join(root, "metadata.db"))
if err != nil {
t.Fatalf("NewMetadataHandler: %v", err)
}
blob, err := storage.NewBlobStore(root, 4)
if err != nil {
t.Fatalf("NewBlobStore: %v", err)
}
svc := NewObjectService(md, blob, time.Hour, maxUploadSize)
t.Cleanup(func() {
_ = svc.Close()
})
return svc
}

View File

@@ -17,6 +17,8 @@ import (
const blobRoot = "blobs" const blobRoot = "blobs"
const maxChunkSize = 64 * 1024 * 1024 const maxChunkSize = 64 * 1024 * 1024
var ErrChunkIntegrity = errors.New("chunk integrity check failed")
type BlobStore struct { type BlobStore struct {
dataRoot string dataRoot string
chunkSize int chunkSize int
@@ -185,6 +187,11 @@ func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
chunkHash := sha256.Sum256(data)
actualChunkID := hex.EncodeToString(chunkHash[:])
if actualChunkID != chunkID {
return nil, fmt.Errorf("%w: expected %s, got %s", ErrChunkIntegrity, chunkID, actualChunkID)
}
size = int64(len(data)) size = int64(len(data))
success = true success = true
return data, nil return data, nil

79
storage/blob_test.go Normal file
View File

@@ -0,0 +1,79 @@
package storage
import (
"errors"
"io"
"os"
"path/filepath"
"strings"
"testing"
)
func TestGetBlobDetectsCorruptedChunk(t *testing.T) {
root := t.TempDir()
bs, err := NewBlobStore(root, 4)
if err != nil {
t.Fatalf("new blob store: %v", err)
}
chunks, _, _, err := bs.IngestStream(strings.NewReader("good"))
if err != nil {
t.Fatalf("ingest: %v", err)
}
chunkID := chunks[0]
corruptChunk(t, root, chunkID, []byte("bad"))
got, err := bs.GetBlob(chunkID)
if !errors.Is(err, ErrChunkIntegrity) {
t.Fatalf("GetBlob error = %v, want ErrChunkIntegrity", err)
}
if got != nil {
t.Fatalf("GetBlob returned data for corrupted chunk: %q", got)
}
}
func TestAssembleStreamDetectsCorruptedChunk(t *testing.T) {
root := t.TempDir()
bs, err := NewBlobStore(root, 4)
if err != nil {
t.Fatalf("new blob store: %v", err)
}
chunks, _, _, err := bs.IngestStream(strings.NewReader("abcdefgh"))
if err != nil {
t.Fatalf("ingest: %v", err)
}
if len(chunks) != 2 {
t.Fatalf("chunk count = %d, want 2", len(chunks))
}
corruptChunk(t, root, chunks[1], []byte("corrupt"))
pr, pw := io.Pipe()
errCh := make(chan error, 1)
go func() {
err := bs.AssembleStream(chunks, pw)
if err != nil {
_ = pw.CloseWithError(err)
} else {
_ = pw.Close()
}
errCh <- err
}()
_, readErr := io.ReadAll(pr)
assembleErr := <-errCh
if !errors.Is(assembleErr, ErrChunkIntegrity) {
t.Fatalf("AssembleStream error = %v, want ErrChunkIntegrity", assembleErr)
}
if !errors.Is(readErr, ErrChunkIntegrity) {
t.Fatalf("pipe read error = %v, want ErrChunkIntegrity", readErr)
}
}
func corruptChunk(t *testing.T, root, chunkID string, data []byte) {
t.Helper()
path := filepath.Join(root, blobRoot, chunkID[:2], chunkID[2:4], chunkID)
if err := os.WriteFile(path, data, 0o600); err != nil {
t.Fatalf("corrupt chunk: %v", err)
}
}

View File

@@ -15,6 +15,7 @@ type Config struct {
Address string Address string
Port int Port int
ChunkSize int ChunkSize int
MaxObjectUploadBytes int64
LogLevel string LogLevel string
LogFormat string LogFormat string
AuditLog bool AuditLog bool
@@ -40,6 +41,7 @@ func NewConfig() *Config {
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"), Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
Port: envIntRange("PORT", 2600, 1, 65535), Port: envIntRange("PORT", 2600, 1, 65535),
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024), ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
MaxObjectUploadBytes: envInt64Range("FS_MAX_OBJECT_UPLOAD_BYTES", 5*1024*1024*1024, 1, 5*1024*1024*1024),
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")), LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")), LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true), AuditLog: envBool("AUDIT_LOG", true),
@@ -82,6 +84,21 @@ func envIntRange(key string, defaultValue, minValue, maxValue int) int {
return value return value
} }
func envInt64Range(key string, defaultValue, minValue, maxValue int64) int64 {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
return defaultValue
}
if value < minValue || value > maxValue {
return defaultValue
}
return value
}
func envBool(key string, defaultValue bool) bool { func envBool(key string, defaultValue bool) bool {
raw := strings.TrimSpace(os.Getenv(key)) raw := strings.TrimSpace(os.Getenv(key))
if raw == "" { if raw == "" {

21
utils/config_test.go Normal file
View File

@@ -0,0 +1,21 @@
package utils
import "testing"
func TestEnvInt64Range(t *testing.T) {
t.Setenv("TEST_INT64_RANGE", "42")
if got := envInt64Range("TEST_INT64_RANGE", 10, 1, 100); got != 42 {
t.Fatalf("envInt64Range valid = %d, want 42", got)
}
}
func TestEnvInt64RangeFallsBackForInvalidValues(t *testing.T) {
t.Setenv("TEST_INT64_RANGE", "invalid")
if got := envInt64Range("TEST_INT64_RANGE", 10, 1, 100); got != 10 {
t.Fatalf("envInt64Range invalid = %d, want 10", got)
}
t.Setenv("TEST_INT64_RANGE", "101")
if got := envInt64Range("TEST_INT64_RANGE", 10, 1, 100); got != 10 {
t.Fatalf("envInt64Range too large = %d, want 10", got)
}
}