36 Commits

Author SHA1 Message Date
651413d494 Merge pull request #6 from ferdzo/feature/metrics
Metrics endpoint
2026-03-02 23:28:05 +01:00
c03bd3e3a2 Minimal fixes for metrics 2026-03-02 23:26:57 +01:00
8c9cd96213 Auth for metrics, removed unwanted metrics and fixed tests. 2026-03-02 22:30:15 +01:00
Andrej Mickov
6ca3fb8701 Updated metrics 2026-02-27 16:38:51 +01:00
Andrej Mickov
f04f7601c0 Initial metrics endpoint added in Prometheus style 2026-02-27 14:59:23 +01:00
Andrej Mickov
2fea3da9ee Added ListObjectsV1 2026-02-27 14:01:10 +01:00
ba4256fd00 Update .env.example
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-27 10:39:12 +01:00
0edaae6dad Update README.md
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-27 10:38:55 +01:00
678c10a3ad Initial working authentication with SigV4 2026-02-27 01:35:20 +01:00
79819ad2d0 Merge pull request #4 from ferdzo/fix/general
General enhancements and fixes
2026-02-25 00:46:30 +01:00
abe1f453fc Enhance API with health check endpoints and improve multipart upload management 2026-02-25 00:34:06 +01:00
a9fbc06dd0 Fix formatting in README for object operations 2026-02-24 14:04:09 +01:00
edfb5f5b2a Refine object operations and multi-object delete sections
Updated the formatting and structure of the object operations and multi-object delete sections in the README.
2026-02-24 10:23:53 +01:00
c997fe8471 Improve formatting of features in README
Updated feature list formatting in README.md for better readability.
2026-02-24 10:23:24 +01:00
fca553028c Merge pull request #3 from ferdzo/LICENSE
License
2026-02-24 10:20:05 +01:00
3630aad584 Merge branch 'develop' into LICENSE 2026-02-24 10:19:41 +01:00
93296ff74e Merge pull request #2 from ferdzo/feature/garbage-collection
Garbage collection and few other things
2026-02-24 10:18:11 +01:00
1b7393a545 Fix license section heading in README.md 2026-02-24 10:17:05 +01:00
Andrej Mickov
a3fad34272 LICENSE.md 2026-02-24 10:14:00 +01:00
06c90be50f Fixed copilot suggestions. 2026-02-24 00:28:33 +01:00
5e87247087 Introduced garbage collection, safe data write to storage and improved S3 compatibility. 2026-02-24 00:05:53 +01:00
a4990dae01 Merge pull request #1 from ferdzo/feature/multipart-upload
Multipart Upload support
2026-02-23 22:37:33 +01:00
d9a1bd9001 Applied Copilot review suggestions 2026-02-23 22:35:42 +01:00
a8204de914 Fixed logging, added config and .env example 2026-02-23 21:52:45 +01:00
d7bdb3177b Added logging 2026-02-23 00:42:38 +01:00
c989037160 Finialized multipart upload and graceful shutdown. Added Dockerfile. 2026-02-22 23:00:33 +01:00
5d41ec9e0a Implemented bulk delete from bucket, AWS SigV4 framing problems solved. 2026-02-22 15:32:04 +01:00
111ce5b669 Working MultipartUpload that needs minor tweaks. 2026-02-22 14:46:04 +01:00
5438a7f4b4 Updated gitignore 2026-02-22 13:43:23 +01:00
9b5035dfa0 Initial Multipart Upload 2026-02-22 13:42:23 +01:00
65a7a7eef8 S3 Error handling 2026-02-22 13:02:22 +01:00
eb798be550 Updated error handling to be S3 XML compatible. Implemented DeleteObject. 2026-02-22 13:01:59 +01:00
b19c24d9b7 Object prefix list filtering 2026-02-21 21:35:15 +01:00
6fe5a8a629 Added Bucket routes and bucket logic 2026-02-21 21:27:34 +01:00
151c11a636 HEAD Object route 2026-02-21 12:13:33 +01:00
f151f8055a Working simple PUT/GET API 2026-02-21 11:55:14 +01:00
31 changed files with 5774 additions and 88 deletions

4
.dockerignore Normal file
View File

@@ -0,0 +1,4 @@
*.md
.gocache/
blobs/
data/

19
.env.example Normal file
View File

@@ -0,0 +1,19 @@
LOG_LEVEL=debug
LOG_FORMAT=text
DATA_PATH=data/
PORT=2600
AUDIT_LOG=true
ADDRESS=0.0.0.0
GC_INTERVAL=10
GC_ENABLED=true
MULTIPART_RETENTION_HOURS=24
AUTH_ENABLED=false
AUTH_REGION=us-east-1
AUTH_SKEW_SECONDS=300
AUTH_MAX_PRESIGN_SECONDS=86400
# When AUTH_ENABLED=true you MUST set AUTH_MASTER_KEY to a strong random value, e.g.:
# openssl rand -base64 32
AUTH_MASTER_KEY=REPLACE_WITH_SECURE_RANDOM_KEY
AUTH_BOOTSTRAP_ACCESS_KEY=
AUTH_BOOTSTRAP_SECRET_KEY=
AUTH_BOOTSTRAP_POLICY=

5
.gitignore vendored
View File

@@ -1,5 +1,8 @@
.env .env
*.db
.vscode/ .vscode/
blobs/ blobs/
*.db data/
.idea/ .idea/
.gocache/
.gomodcache/

18
Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM golang:1.25-alpine AS build
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /app/fs .
FROM alpine:3.23 AS runner
COPY --from=build /app/fs /app/fs
WORKDIR /app
EXPOSE 2600
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 CMD wget -q -O /dev/null "http://127.0.0.1:${PORT:-2600}/healthz" || exit 1
CMD ["/app/fs"]

8
LICENSE.md Normal file
View File

@@ -0,0 +1,8 @@
Copyright 2025 ferdzo
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -1,3 +1,59 @@
# fs # fs
An experimental Object Storage written in Go that should be compatible with S3 An experimental Object Storage written in Go that should be partially compatible with S3
## Features
Bucket operations:
- `PUT /{bucket}`
- `HEAD /{bucket}`
- `DELETE /{bucket}`
- `GET /` (list buckets)
Object operations:
- `PUT /{bucket}/{key}`
- `GET /{bucket}/{key}`
- `HEAD /{bucket}/{key}`
- `DELETE /{bucket}/{key}`
- `GET /{bucket}?list-type=2&prefix=...` (ListObjectsV2-style)
Multipart upload:
- `POST /{bucket}/{key}?uploads` (initiate)
- `PUT /{bucket}/{key}?uploadId=...&partNumber=N` (upload part)
- `GET /{bucket}/{key}?uploadId=...` (list parts)
- `POST /{bucket}/{key}?uploadId=...` (complete)
- `DELETE /{bucket}/{key}?uploadId=...` (abort)
Multi-object delete:
- `POST /{bucket}?delete` with S3-style XML body
AWS SigV4 streaming payload decoding for uploads (`aws-chunked` request bodies)
Authentication:
- AWS SigV4 request verification (header and presigned URL forms)
- Local credential/policy store in bbolt
- Bootstrap access key/secret via environment variables
## Auth Setup
Required when `AUTH_ENABLED=true`:
- `AUTH_MASTER_KEY` must be base64 for 32 decoded bytes (AES-256 key), e.g. `openssl rand -base64 32`
- `AUTH_BOOTSTRAP_ACCESS_KEY` and `AUTH_BOOTSTRAP_SECRET_KEY` define initial credentials
Reference: `auth/README.md`
Health:
- `GET /healthz`
- `HEAD /healthz`
- `GET /metrics` (Prometheus exposition format)
- `HEAD /metrics`
## Limitations
- Not full S3 API coverage.
- No versioning or lifecycle policies.
- Error and edge-case behavior is still being refined for client compatibility.
## License
MIT License

1216
api/api.go

File diff suppressed because it is too large Load Diff

242
api/s3_errors.go Normal file
View File

@@ -0,0 +1,242 @@
package api
import (
"encoding/xml"
"errors"
"fs/auth"
"fs/metadata"
"fs/metrics"
"fs/models"
"fs/service"
"net/http"
"github.com/go-chi/chi/v5/middleware"
)
type s3APIError struct {
Status int
Code string
Message string
}
var (
s3ErrInvalidObjectKey = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidArgument",
Message: "Object key is required.",
}
s3ErrKeyTooLong = s3APIError{
Status: http.StatusBadRequest,
Code: "KeyTooLongError",
Message: "Your key is too long.",
}
s3ErrNotImplemented = s3APIError{
Status: http.StatusNotImplemented,
Code: "NotImplemented",
Message: "A header you provided implies functionality that is not implemented.",
}
s3ErrInvalidPart = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidPart",
Message: "One or more of the specified parts could not be found.",
}
s3ErrInvalidPartOrder = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidPartOrder",
Message: "The list of parts was not in ascending order.",
}
s3ErrMalformedXML = s3APIError{
Status: http.StatusBadRequest,
Code: "MalformedXML",
Message: "The XML you provided was not well-formed or did not validate against our published schema.",
}
s3ErrInvalidArgument = s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidArgument",
Message: "Invalid argument.",
}
s3ErrInvalidRange = s3APIError{
Status: http.StatusRequestedRangeNotSatisfiable,
Code: "InvalidRange",
Message: "The requested range is not satisfiable.",
}
s3ErrPreconditionFailed = s3APIError{
Status: http.StatusPreconditionFailed,
Code: "PreconditionFailed",
Message: "At least one of the pre-conditions you specified did not hold.",
}
s3ErrEntityTooSmall = s3APIError{
Status: http.StatusBadRequest,
Code: "EntityTooSmall",
Message: "Your proposed upload is smaller than the minimum allowed object size.",
}
s3ErrEntityTooLarge = s3APIError{
Status: http.StatusRequestEntityTooLarge,
Code: "EntityTooLarge",
Message: "Your proposed upload exceeds the maximum allowed size.",
}
s3ErrTooManyDeleteObjects = s3APIError{
Status: http.StatusBadRequest,
Code: "MalformedXML",
Message: "The request must contain no more than 1000 object identifiers.",
}
s3ErrAccessDenied = s3APIError{
Status: http.StatusForbidden,
Code: "AccessDenied",
Message: "Access Denied.",
}
s3ErrInvalidAccessKeyID = s3APIError{
Status: http.StatusForbidden,
Code: "InvalidAccessKeyId",
Message: "The AWS Access Key Id you provided does not exist in our records.",
}
s3ErrSignatureDoesNotMatch = s3APIError{
Status: http.StatusForbidden,
Code: "SignatureDoesNotMatch",
Message: "The request signature we calculated does not match the signature you provided.",
}
s3ErrAuthorizationHeaderMalformed = s3APIError{
Status: http.StatusBadRequest,
Code: "AuthorizationHeaderMalformed",
Message: "The authorization header is malformed; the region/service/date is wrong or missing.",
}
s3ErrRequestTimeTooSkewed = s3APIError{
Status: http.StatusForbidden,
Code: "RequestTimeTooSkewed",
Message: "The difference between the request time and the server's time is too large.",
}
s3ErrExpiredToken = s3APIError{
Status: http.StatusBadRequest,
Code: "ExpiredToken",
Message: "The provided token has expired.",
}
s3ErrInvalidPresign = s3APIError{
Status: http.StatusBadRequest,
Code: "AuthorizationQueryParametersError",
Message: "Error parsing the X-Amz-Credential parameter.",
}
s3ErrInternal = s3APIError{
Status: http.StatusInternalServerError,
Code: "InternalError",
Message: "We encountered an internal error. Please try again.",
}
)
func mapToS3Error(err error) s3APIError {
switch {
case errors.Is(err, metadata.ErrInvalidBucketName):
return s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidBucketName",
Message: "The specified bucket is not valid.",
}
case errors.Is(err, metadata.ErrBucketAlreadyExists):
return s3APIError{
Status: http.StatusConflict,
Code: "BucketAlreadyOwnedByYou",
Message: "Your previous request to create the named bucket succeeded and you already own it.",
}
case errors.Is(err, metadata.ErrBucketNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchBucket",
Message: "The specified bucket does not exist.",
}
case errors.Is(err, metadata.ErrBucketNotEmpty):
return s3APIError{
Status: http.StatusConflict,
Code: "BucketNotEmpty",
Message: "The bucket you tried to delete is not empty.",
}
case errors.Is(err, metadata.ErrObjectNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchKey",
Message: "The specified key does not exist.",
}
case errors.Is(err, metadata.ErrMultipartNotFound):
return s3APIError{
Status: http.StatusNotFound,
Code: "NoSuchUpload",
Message: "The specified multipart upload does not exist.",
}
case errors.Is(err, metadata.ErrMultipartNotPending):
return s3APIError{
Status: http.StatusBadRequest,
Code: "InvalidRequest",
Message: "The multipart upload is not in a valid state for this operation.",
}
case errors.Is(err, service.ErrInvalidPart):
return s3ErrInvalidPart
case errors.Is(err, service.ErrInvalidPartOrder):
return s3ErrInvalidPartOrder
case errors.Is(err, service.ErrInvalidCompleteRequest):
return s3ErrMalformedXML
case errors.Is(err, service.ErrEntityTooSmall):
return s3ErrEntityTooSmall
case errors.Is(err, auth.ErrAccessDenied):
return s3ErrAccessDenied
case errors.Is(err, auth.ErrInvalidAccessKeyID):
return s3ErrInvalidAccessKeyID
case errors.Is(err, auth.ErrSignatureDoesNotMatch):
return s3ErrSignatureDoesNotMatch
case errors.Is(err, auth.ErrAuthorizationHeaderMalformed):
return s3ErrAuthorizationHeaderMalformed
case errors.Is(err, auth.ErrRequestTimeTooSkewed):
return s3ErrRequestTimeTooSkewed
case errors.Is(err, auth.ErrExpiredToken):
return s3ErrExpiredToken
case errors.Is(err, auth.ErrCredentialDisabled):
return s3ErrAccessDenied
case errors.Is(err, auth.ErrNoAuthCredentials):
return s3ErrAccessDenied
case errors.Is(err, auth.ErrUnsupportedAuthScheme):
return s3ErrAuthorizationHeaderMalformed
case errors.Is(err, auth.ErrInvalidPresign):
return s3ErrInvalidPresign
default:
return s3ErrInternal
}
}
func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) {
requestID := ""
op := "other"
if r != nil {
requestID = middleware.GetReqID(r.Context())
isDeletePost := false
if r.Method == http.MethodPost {
_, isDeletePost = r.URL.Query()["delete"]
}
op = metrics.NormalizeHTTPOperation(r.Method, isDeletePost)
if requestID != "" {
w.Header().Set("x-amz-request-id", requestID)
}
}
metrics.Default.ObserveError(op, apiErr.Code)
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(apiErr.Status)
if r != nil && r.Method == http.MethodHead {
return
}
payload := models.S3ErrorResponse{
Code: apiErr.Code,
Message: apiErr.Message,
Resource: resource,
RequestID: requestID,
}
out, err := xml.MarshalIndent(payload, "", " ")
if err != nil {
return
}
_, _ = w.Write([]byte(xml.Header))
_, _ = w.Write(out)
}
func writeMappedS3Error(w http.ResponseWriter, r *http.Request, err error) {
writeS3Error(w, r, mapToS3Error(err), r.URL.Path)
}

150
auth/README.md Normal file
View File

@@ -0,0 +1,150 @@
# Authentication Design
This folder implements S3-compatible request authentication using AWS Signature Version 4 (SigV4), with local identity and policy data stored in bbolt.
## Goals
- Keep S3 client compatibility for request signing.
- Avoid external auth databases.
- Store secrets encrypted at rest (not plaintext in bbolt).
- Keep authorization simple and explicit.
## High-Level Architecture
- `auth/middleware.go`
- HTTP middleware that enforces auth before API handlers.
- Exempts `/healthz`.
- Calls auth service and writes mapped S3 XML errors on failure.
- `auth/service.go`
- Main auth orchestration:
- parse SigV4 from request
- validate timestamp/scope/service/region
- load identity from metadata
- decrypt secret
- verify signature
- evaluate policy against requested S3 action
- `auth/sigv4.go`
- Canonical SigV4 parsing and verification helpers.
- Supports header auth and presigned query auth.
- `auth/policy.go`
- Authorization evaluator (deny overrides allow).
- `auth/action.go`
- Maps HTTP method/path/query to logical S3 action + resource target.
- `auth/crypto.go`
- AES-256-GCM encryption/decryption for stored secret keys.
- `auth/context.go`
- Carries authentication result in request context for downstream logic.
- `auth/config.go`
- Normalized auth configuration.
- `auth/errors.go`
- Domain auth errors used by API S3 error mapping.
## Config Model
Auth is configured through env (read in `utils/config.go`, converted in `auth/config.go`):
- `AUTH_ENABLED`
- `AUTH_REGION`
- `AUTH_SKEW_SECONDS`
- `AUTH_MAX_PRESIGN_SECONDS`
- `AUTH_MASTER_KEY`
- `AUTH_BOOTSTRAP_ACCESS_KEY`
- `AUTH_BOOTSTRAP_SECRET_KEY`
- `AUTH_BOOTSTRAP_POLICY` (optional JSON)
Important:
- If `AUTH_ENABLED=true`, `AUTH_MASTER_KEY` is required.
- `AUTH_MASTER_KEY` must be base64 that decodes to exactly 32 bytes (AES-256 key).
## Persistence Model (bbolt)
Implemented in metadata layer:
- `__AUTH_IDENTITIES__` bucket stores `models.AuthIdentity`
- `access_key_id`
- encrypted secret (`secret_enc`, `secret_nonce`)
- status (`active`/disabled)
- timestamps
- `__AUTH_POLICIES__` bucket stores `models.AuthPolicy`
- `principal`
- statements (`effect`, `actions`, `bucket`, `prefix`)
## Bootstrap Identity
On startup (`main.go`):
1. Build auth config.
2. Create auth service with metadata store.
3. Call `EnsureBootstrap()`.
If bootstrap env key/secret are set:
- identity is created/updated
- secret is encrypted with AES-GCM and stored
- policy is created:
- default: full access (`s3:*`, `bucket=*`, `prefix=*`)
- or overridden by `AUTH_BOOTSTRAP_POLICY`
## Request Authentication Flow
For each non-health request:
1. Parse SigV4 input (header or presigned query).
2. Validate structural fields:
- algorithm
- credential scope
- service must be `s3`
- region must match config
3. Validate time:
- `x-amz-date` format
- skew within `AUTH_SKEW_SECONDS`
- presigned expiry within `AUTH_MAX_PRESIGN_SECONDS`
4. Load identity by access key id.
5. Ensure identity status is active.
6. Decrypt stored secret using master key.
7. Recompute canonical request and expected signature.
8. Compare signatures.
9. Resolve target action from request.
10. Evaluate policy; deny overrides allow.
11. Store auth result in request context and continue.
## Authorization Semantics
Policy evaluator rules:
- No matching allow => denied.
- Any matching deny => denied (even if allow also matches).
- Wildcards supported:
- action: `*` or `s3:*`
- bucket: `*`
- prefix: `*`
Action resolution includes:
- bucket APIs (`CreateBucket`, `ListBucket`, `HeadBucket`, `DeleteBucket`)
- object APIs (`GetObject`, `PutObject`, `DeleteObject`)
- multipart APIs (`CreateMultipartUpload`, `UploadPart`, `ListMultipartUploadParts`, `CompleteMultipartUpload`, `AbortMultipartUpload`)
## Error Behavior
Auth errors are mapped to S3-style XML errors in `api/s3_errors.go`, including:
- `AccessDenied`
- `InvalidAccessKeyId`
- `SignatureDoesNotMatch`
- `AuthorizationHeaderMalformed`
- `RequestTimeTooSkewed`
- `ExpiredToken`
- `AuthorizationQueryParametersError`
## Audit Logging
When `AUDIT_LOG=true` and auth is enabled:
- successful auth attempts emit `auth_success`
- failed auth attempts emit `auth_failed`
Each audit entry includes method, path, remote IP, and request ID (if present). Success logs also include access key ID and auth type.
## Security Notes
- Secret keys are recoverable by server design (required for SigV4 verification).
- They are encrypted at rest, not hashed.
- Master key rotation is not implemented yet.
- Keep `AUTH_MASTER_KEY` protected (secret manager/systemd env file/etc.).
## Current Scope / Limitations
- No STS/session-token auth yet.
- No admin API for managing multiple users yet.
- Policy language is intentionally minimal, not full IAM.
- No automatic key rotation workflows.
## Practical Next Step
To support multiple users cleanly, add admin operations in auth service + API:
- create user
- rotate secret
- set policy
- disable/enable
- delete user

93
auth/action.go Normal file
View File

@@ -0,0 +1,93 @@
package auth
import (
"net/http"
"strings"
)
type Action string
const (
ActionListAllMyBuckets Action = "s3:ListAllMyBuckets"
ActionCreateBucket Action = "s3:CreateBucket"
ActionHeadBucket Action = "s3:HeadBucket"
ActionDeleteBucket Action = "s3:DeleteBucket"
ActionListBucket Action = "s3:ListBucket"
ActionGetObject Action = "s3:GetObject"
ActionPutObject Action = "s3:PutObject"
ActionDeleteObject Action = "s3:DeleteObject"
ActionCreateMultipartUpload Action = "s3:CreateMultipartUpload"
ActionUploadPart Action = "s3:UploadPart"
ActionListMultipartParts Action = "s3:ListMultipartUploadParts"
ActionCompleteMultipart Action = "s3:CompleteMultipartUpload"
ActionAbortMultipartUpload Action = "s3:AbortMultipartUpload"
)
type RequestTarget struct {
Action Action
Bucket string
Key string
}
func resolveTarget(r *http.Request) RequestTarget {
path := strings.TrimPrefix(r.URL.Path, "/")
if path == "" {
return RequestTarget{Action: ActionListAllMyBuckets}
}
parts := strings.SplitN(path, "/", 2)
bucket := parts[0]
key := ""
if len(parts) > 1 {
key = parts[1]
}
if key == "" {
switch r.Method {
case http.MethodPut:
return RequestTarget{Action: ActionCreateBucket, Bucket: bucket}
case http.MethodHead:
return RequestTarget{Action: ActionHeadBucket, Bucket: bucket}
case http.MethodDelete:
return RequestTarget{Action: ActionDeleteBucket, Bucket: bucket}
case http.MethodGet:
return RequestTarget{Action: ActionListBucket, Bucket: bucket}
case http.MethodPost:
if _, ok := r.URL.Query()["delete"]; ok {
return RequestTarget{Action: ActionDeleteObject, Bucket: bucket}
}
}
return RequestTarget{Bucket: bucket}
}
uploadID := r.URL.Query().Get("uploadId")
partNumber := r.URL.Query().Get("partNumber")
if _, ok := r.URL.Query()["uploads"]; ok && r.Method == http.MethodPost {
return RequestTarget{Action: ActionCreateMultipartUpload, Bucket: bucket, Key: key}
}
if uploadID != "" {
switch r.Method {
case http.MethodPut:
if partNumber != "" {
return RequestTarget{Action: ActionUploadPart, Bucket: bucket, Key: key}
}
case http.MethodGet:
return RequestTarget{Action: ActionListMultipartParts, Bucket: bucket, Key: key}
case http.MethodPost:
return RequestTarget{Action: ActionCompleteMultipart, Bucket: bucket, Key: key}
case http.MethodDelete:
return RequestTarget{Action: ActionAbortMultipartUpload, Bucket: bucket, Key: key}
}
}
switch r.Method {
case http.MethodGet, http.MethodHead:
return RequestTarget{Action: ActionGetObject, Bucket: bucket, Key: key}
case http.MethodPut:
return RequestTarget{Action: ActionPutObject, Bucket: bucket, Key: key}
case http.MethodDelete:
return RequestTarget{Action: ActionDeleteObject, Bucket: bucket, Key: key}
}
return RequestTarget{Bucket: bucket, Key: key}
}

50
auth/config.go Normal file
View File

@@ -0,0 +1,50 @@
package auth
import (
"strings"
"time"
)
type Config struct {
Enabled bool
Region string
ClockSkew time.Duration
MaxPresignDuration time.Duration
MasterKey string
BootstrapAccessKey string
BootstrapSecretKey string
BootstrapPolicy string
}
func ConfigFromValues(
enabled bool,
region string,
skew time.Duration,
maxPresign time.Duration,
masterKey string,
bootstrapAccessKey string,
bootstrapSecretKey string,
bootstrapPolicy string,
) Config {
region = strings.TrimSpace(region)
if region == "" {
region = "us-east-1"
}
if skew <= 0 {
skew = 5 * time.Minute
}
if maxPresign <= 0 {
maxPresign = 24 * time.Hour
}
return Config{
Enabled: enabled,
Region: region,
ClockSkew: skew,
MaxPresignDuration: maxPresign,
MasterKey: strings.TrimSpace(masterKey),
BootstrapAccessKey: strings.TrimSpace(bootstrapAccessKey),
BootstrapSecretKey: strings.TrimSpace(bootstrapSecretKey),
BootstrapPolicy: strings.TrimSpace(bootstrapPolicy),
}
}

23
auth/context.go Normal file
View File

@@ -0,0 +1,23 @@
package auth
import "context"
type RequestContext struct {
Authenticated bool
AccessKeyID string
AuthType string
}
type contextKey int
const requestContextKey contextKey = iota
func WithRequestContext(ctx context.Context, authCtx RequestContext) context.Context {
return context.WithValue(ctx, requestContextKey, authCtx)
}
func GetRequestContext(ctx context.Context) (RequestContext, bool) {
value := ctx.Value(requestContextKey)
authCtx, ok := value.(RequestContext)
return authCtx, ok
}

74
auth/crypto.go Normal file
View File

@@ -0,0 +1,74 @@
package auth
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
)
const (
masterKeyLength = 32
gcmNonceLength = 12
)
func decodeMasterKey(raw string) ([]byte, error) {
decoded, err := base64.StdEncoding.DecodeString(raw)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrInvalidMasterKey, err)
}
if len(decoded) != masterKeyLength {
return nil, fmt.Errorf("%w: expected %d-byte decoded key", ErrInvalidMasterKey, masterKeyLength)
}
return decoded, nil
}
func encryptSecret(masterKey []byte, accessKeyID, secret string) (ciphertextB64 string, nonceB64 string, err error) {
block, err := aes.NewCipher(masterKey)
if err != nil {
return "", "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", "", err
}
nonce := make([]byte, gcmNonceLength)
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return "", "", err
}
ciphertext := gcm.Seal(nil, nonce, []byte(secret), []byte(accessKeyID))
return base64.StdEncoding.EncodeToString(ciphertext), base64.StdEncoding.EncodeToString(nonce), nil
}
func decryptSecret(masterKey []byte, accessKeyID, ciphertextB64, nonceB64 string) (string, error) {
block, err := aes.NewCipher(masterKey)
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64)
if err != nil {
return "", err
}
nonce, err := base64.StdEncoding.DecodeString(nonceB64)
if err != nil {
return "", err
}
if len(nonce) != gcmNonceLength {
return "", fmt.Errorf("invalid nonce length: %d", len(nonce))
}
plaintext, err := gcm.Open(nil, nonce, ciphertext, []byte(accessKeyID))
if err != nil {
return "", err
}
return string(plaintext), nil
}

19
auth/errors.go Normal file
View File

@@ -0,0 +1,19 @@
package auth
import "errors"
var (
ErrAccessDenied = errors.New("access denied")
ErrInvalidAccessKeyID = errors.New("invalid access key id")
ErrSignatureDoesNotMatch = errors.New("signature does not match")
ErrAuthorizationHeaderMalformed = errors.New("authorization header malformed")
ErrRequestTimeTooSkewed = errors.New("request time too skewed")
ErrExpiredToken = errors.New("expired token")
ErrCredentialDisabled = errors.New("credential disabled")
ErrAuthNotEnabled = errors.New("authentication is not enabled")
ErrMasterKeyRequired = errors.New("auth master key is required")
ErrInvalidMasterKey = errors.New("invalid auth master key")
ErrNoAuthCredentials = errors.New("no auth credentials found")
ErrUnsupportedAuthScheme = errors.New("unsupported auth scheme")
ErrInvalidPresign = errors.New("invalid presigned request")
)

111
auth/middleware.go Normal file
View File

@@ -0,0 +1,111 @@
package auth
import (
"errors"
"fs/metrics"
"log/slog"
"net"
"net/http"
"github.com/go-chi/chi/v5/middleware"
)
func Middleware(
svc *Service,
logger *slog.Logger,
auditEnabled bool,
onError func(http.ResponseWriter, *http.Request, error),
) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authCtx := RequestContext{Authenticated: false, AuthType: "none"}
if svc == nil || !svc.Config().Enabled {
metrics.Default.ObserveAuth("bypass", "disabled", "auth_disabled")
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
return
}
if r.URL.Path == "/healthz" {
metrics.Default.ObserveAuth("bypass", "none", "public_endpoint")
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
return
}
resolvedCtx, err := svc.AuthenticateRequest(r)
if err != nil {
metrics.Default.ObserveAuth("error", "sigv4", authErrorClass(err))
if auditEnabled && logger != nil {
requestID := middleware.GetReqID(r.Context())
attrs := []any{
"method", r.Method,
"path", r.URL.Path,
"remote_ip", clientIP(r.RemoteAddr),
"error", err.Error(),
}
if requestID != "" {
attrs = append(attrs, "request_id", requestID)
}
logger.Warn("auth_failed", attrs...)
}
if onError != nil {
onError(w, r, err)
return
}
http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
return
}
metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none")
if auditEnabled && logger != nil {
requestID := middleware.GetReqID(r.Context())
attrs := []any{
"method", r.Method,
"path", r.URL.Path,
"remote_ip", clientIP(r.RemoteAddr),
"access_key_id", resolvedCtx.AccessKeyID,
"auth_type", resolvedCtx.AuthType,
}
if requestID != "" {
attrs = append(attrs, "request_id", requestID)
}
logger.Info("auth_success", attrs...)
}
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), resolvedCtx)))
})
}
}
func authErrorClass(err error) string {
switch {
case errors.Is(err, ErrInvalidAccessKeyID):
return "invalid_access_key"
case errors.Is(err, ErrSignatureDoesNotMatch):
return "signature_mismatch"
case errors.Is(err, ErrAuthorizationHeaderMalformed):
return "auth_header_malformed"
case errors.Is(err, ErrRequestTimeTooSkewed):
return "time_skew"
case errors.Is(err, ErrExpiredToken):
return "expired_token"
case errors.Is(err, ErrNoAuthCredentials):
return "missing_credentials"
case errors.Is(err, ErrUnsupportedAuthScheme):
return "unsupported_auth_scheme"
case errors.Is(err, ErrInvalidPresign):
return "invalid_presign"
case errors.Is(err, ErrCredentialDisabled):
return "credential_disabled"
case errors.Is(err, ErrAccessDenied):
return "access_denied"
default:
return "other"
}
}
func clientIP(remoteAddr string) string {
host, _, err := net.SplitHostPort(remoteAddr)
if err == nil && host != "" {
return host
}
return remoteAddr
}

66
auth/policy.go Normal file
View File

@@ -0,0 +1,66 @@
package auth
import (
"fs/models"
"strings"
)
func isAllowed(policy *models.AuthPolicy, target RequestTarget) bool {
if policy == nil {
return false
}
allowed := false
for _, stmt := range policy.Statements {
if !statementMatches(stmt, target) {
continue
}
effect := strings.ToLower(strings.TrimSpace(stmt.Effect))
if effect == "deny" {
return false
}
if effect == "allow" {
allowed = true
}
}
return allowed
}
func statementMatches(stmt models.AuthPolicyStatement, target RequestTarget) bool {
if !actionMatches(stmt.Actions, target.Action) {
return false
}
if !bucketMatches(stmt.Bucket, target.Bucket) {
return false
}
if target.Key == "" {
return true
}
prefix := strings.TrimSpace(stmt.Prefix)
if prefix == "" || prefix == "*" {
return true
}
return strings.HasPrefix(target.Key, prefix)
}
func actionMatches(actions []string, action Action) bool {
if len(actions) == 0 {
return false
}
for _, current := range actions {
normalized := strings.TrimSpace(current)
if normalized == "*" || normalized == "s3:*" || strings.EqualFold(normalized, string(action)) {
return true
}
}
return false
}
func bucketMatches(pattern, bucket string) bool {
pattern = strings.TrimSpace(pattern)
if pattern == "" || pattern == "*" {
return true
}
return pattern == bucket
}

186
auth/service.go Normal file
View File

@@ -0,0 +1,186 @@
package auth
import (
"encoding/json"
"errors"
"fmt"
"fs/models"
"net/http"
"strings"
"time"
)
type Store interface {
GetAuthIdentity(accessKeyID string) (*models.AuthIdentity, error)
PutAuthIdentity(identity *models.AuthIdentity) error
GetAuthPolicy(accessKeyID string) (*models.AuthPolicy, error)
PutAuthPolicy(policy *models.AuthPolicy) error
}
type Service struct {
cfg Config
store Store
masterKey []byte
now func() time.Time
}
func NewService(cfg Config, store Store) (*Service, error) {
if store == nil {
return nil, errors.New("auth store is required")
}
svc := &Service{
cfg: cfg,
store: store,
now: func() time.Time { return time.Now().UTC() },
}
if !cfg.Enabled {
return svc, nil
}
if strings.TrimSpace(cfg.MasterKey) == "" {
return nil, ErrMasterKeyRequired
}
masterKey, err := decodeMasterKey(cfg.MasterKey)
if err != nil {
return nil, err
}
svc.masterKey = masterKey
return svc, nil
}
func (s *Service) Config() Config {
return s.cfg
}
func (s *Service) EnsureBootstrap() error {
if !s.cfg.Enabled {
return nil
}
accessKey := strings.TrimSpace(s.cfg.BootstrapAccessKey)
secret := strings.TrimSpace(s.cfg.BootstrapSecretKey)
if accessKey == "" || secret == "" {
return nil
}
if len(accessKey) < 3 {
return errors.New("bootstrap access key must be at least 3 characters")
}
if len(secret) < 8 {
return errors.New("bootstrap secret key must be at least 8 characters")
}
now := s.now().Unix()
ciphertext, nonce, err := encryptSecret(s.masterKey, accessKey, secret)
if err != nil {
return err
}
identity := &models.AuthIdentity{
AccessKeyID: accessKey,
SecretEnc: ciphertext,
SecretNonce: nonce,
EncAlg: "AES-256-GCM",
KeyVersion: "v1",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
}
if existing, err := s.store.GetAuthIdentity(accessKey); err == nil && existing != nil {
identity.CreatedAt = existing.CreatedAt
}
if err := s.store.PutAuthIdentity(identity); err != nil {
return err
}
policy := defaultBootstrapPolicy(accessKey)
if strings.TrimSpace(s.cfg.BootstrapPolicy) != "" {
parsed, err := parsePolicyJSON(s.cfg.BootstrapPolicy)
if err != nil {
return err
}
policy = parsed
policy.Principal = accessKey
}
return s.store.PutAuthPolicy(policy)
}
func (s *Service) AuthenticateRequest(r *http.Request) (RequestContext, error) {
if !s.cfg.Enabled {
return RequestContext{Authenticated: false, AuthType: "disabled"}, nil
}
input, err := parseSigV4(r)
if err != nil {
return RequestContext{}, err
}
if err := validateSigV4Input(s.now(), s.cfg, input); err != nil {
return RequestContext{}, err
}
identity, err := s.store.GetAuthIdentity(input.AccessKeyID)
if err != nil {
return RequestContext{}, ErrInvalidAccessKeyID
}
if !strings.EqualFold(identity.Status, "active") {
return RequestContext{}, ErrCredentialDisabled
}
secret, err := decryptSecret(s.masterKey, identity.AccessKeyID, identity.SecretEnc, identity.SecretNonce)
if err != nil {
return RequestContext{}, ErrSignatureDoesNotMatch
}
ok, err := signatureMatches(secret, r, input)
if err != nil {
return RequestContext{}, err
}
if !ok {
return RequestContext{}, ErrSignatureDoesNotMatch
}
policy, err := s.store.GetAuthPolicy(identity.AccessKeyID)
if err != nil {
return RequestContext{}, ErrAccessDenied
}
target := resolveTarget(r)
if target.Action == "" {
return RequestContext{}, ErrAccessDenied
}
if !isAllowed(policy, target) {
return RequestContext{}, ErrAccessDenied
}
authType := "sigv4-header"
if input.Presigned {
authType = "sigv4-presign"
}
return RequestContext{
Authenticated: true,
AccessKeyID: identity.AccessKeyID,
AuthType: authType,
}, nil
}
func parsePolicyJSON(raw string) (*models.AuthPolicy, error) {
policy := models.AuthPolicy{}
if err := json.Unmarshal([]byte(raw), &policy); err != nil {
return nil, fmt.Errorf("invalid bootstrap policy: %w", err)
}
if len(policy.Statements) == 0 {
return nil, errors.New("bootstrap policy must contain at least one statement")
}
return &policy, nil
}
func defaultBootstrapPolicy(principal string) *models.AuthPolicy {
return &models.AuthPolicy{
Principal: principal,
Statements: []models.AuthPolicyStatement{
{
Effect: "allow",
Actions: []string{"s3:*"},
Bucket: "*",
Prefix: "*",
},
},
}
}

372
auth/sigv4.go Normal file
View File

@@ -0,0 +1,372 @@
package auth
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
)
const (
sigV4Algorithm = "AWS4-HMAC-SHA256"
)
type sigV4Input struct {
AccessKeyID string
Date string
Region string
Service string
Scope string
SignedHeaders []string
SignedHeadersRaw string
SignatureHex string
AmzDate string
ExpiresSeconds int
Presigned bool
}
func parseSigV4(r *http.Request) (*sigV4Input, error) {
if r == nil {
return nil, fmt.Errorf("%w: nil request", ErrAuthorizationHeaderMalformed)
}
if strings.EqualFold(r.URL.Query().Get("X-Amz-Algorithm"), sigV4Algorithm) {
return parsePresignedSigV4(r)
}
return parseHeaderSigV4(r)
}
func parseHeaderSigV4(r *http.Request) (*sigV4Input, error) {
header := strings.TrimSpace(r.Header.Get("Authorization"))
if header == "" {
return nil, ErrNoAuthCredentials
}
if !strings.HasPrefix(header, sigV4Algorithm+" ") {
return nil, fmt.Errorf("%w: unsupported authorization algorithm", ErrUnsupportedAuthScheme)
}
params := parseAuthorizationParams(strings.TrimSpace(strings.TrimPrefix(header, sigV4Algorithm)))
credentialRaw := params["Credential"]
signedHeadersRaw := params["SignedHeaders"]
signatureHex := params["Signature"]
if credentialRaw == "" || signedHeadersRaw == "" || signatureHex == "" {
return nil, fmt.Errorf("%w: missing required authorization fields", ErrAuthorizationHeaderMalformed)
}
accessKeyID, date, region, service, scope, err := parseCredential(credentialRaw)
if err != nil {
return nil, err
}
amzDate := strings.TrimSpace(r.Header.Get("x-amz-date"))
if amzDate == "" {
return nil, fmt.Errorf("%w: x-amz-date is required", ErrAuthorizationHeaderMalformed)
}
signedHeaders := splitSignedHeaders(signedHeadersRaw)
if len(signedHeaders) == 0 {
return nil, fmt.Errorf("%w: signed headers are required", ErrAuthorizationHeaderMalformed)
}
return &sigV4Input{
AccessKeyID: accessKeyID,
Date: date,
Region: region,
Service: service,
Scope: scope,
SignedHeaders: signedHeaders,
SignedHeadersRaw: strings.ToLower(strings.TrimSpace(signedHeadersRaw)),
SignatureHex: strings.ToLower(strings.TrimSpace(signatureHex)),
AmzDate: amzDate,
Presigned: false,
}, nil
}
func parsePresignedSigV4(r *http.Request) (*sigV4Input, error) {
query := r.URL.Query()
if !strings.EqualFold(query.Get("X-Amz-Algorithm"), sigV4Algorithm) {
return nil, fmt.Errorf("%w: invalid X-Amz-Algorithm", ErrInvalidPresign)
}
credentialRaw := strings.TrimSpace(query.Get("X-Amz-Credential"))
signedHeadersRaw := strings.TrimSpace(query.Get("X-Amz-SignedHeaders"))
signatureHex := strings.TrimSpace(query.Get("X-Amz-Signature"))
amzDate := strings.TrimSpace(query.Get("X-Amz-Date"))
expiresRaw := strings.TrimSpace(query.Get("X-Amz-Expires"))
if credentialRaw == "" || signedHeadersRaw == "" || signatureHex == "" || amzDate == "" || expiresRaw == "" {
return nil, fmt.Errorf("%w: missing presigned query fields", ErrInvalidPresign)
}
expires, err := strconv.Atoi(expiresRaw)
if err != nil || expires < 0 {
return nil, fmt.Errorf("%w: invalid X-Amz-Expires", ErrInvalidPresign)
}
accessKeyID, date, region, service, scope, err := parseCredential(credentialRaw)
if err != nil {
return nil, err
}
signedHeaders := splitSignedHeaders(signedHeadersRaw)
if len(signedHeaders) == 0 {
return nil, fmt.Errorf("%w: signed headers are required", ErrInvalidPresign)
}
return &sigV4Input{
AccessKeyID: accessKeyID,
Date: date,
Region: region,
Service: service,
Scope: scope,
SignedHeaders: signedHeaders,
SignedHeadersRaw: strings.ToLower(strings.TrimSpace(signedHeadersRaw)),
SignatureHex: strings.ToLower(signatureHex),
AmzDate: amzDate,
ExpiresSeconds: expires,
Presigned: true,
}, nil
}
func parseCredential(raw string) (accessKeyID string, date string, region string, service string, scope string, err error) {
parts := strings.Split(strings.TrimSpace(raw), "/")
if len(parts) != 5 {
return "", "", "", "", "", fmt.Errorf("%w: invalid credential scope", ErrAuthorizationHeaderMalformed)
}
accessKeyID = strings.TrimSpace(parts[0])
date = strings.TrimSpace(parts[1])
region = strings.TrimSpace(parts[2])
service = strings.TrimSpace(parts[3])
terminal := strings.TrimSpace(parts[4])
if accessKeyID == "" || date == "" || region == "" || service == "" || terminal != "aws4_request" {
return "", "", "", "", "", fmt.Errorf("%w: invalid credential scope", ErrAuthorizationHeaderMalformed)
}
scope = strings.Join(parts[1:], "/")
return accessKeyID, date, region, service, scope, nil
}
func splitSignedHeaders(raw string) []string {
raw = strings.ToLower(strings.TrimSpace(raw))
if raw == "" {
return nil
}
parts := strings.Split(raw, ";")
headers := make([]string, 0, len(parts))
for _, current := range parts {
current = strings.TrimSpace(current)
if current == "" {
continue
}
headers = append(headers, current)
}
return headers
}
func parseAuthorizationParams(raw string) map[string]string {
params := make(map[string]string)
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, " ")
for _, token := range strings.Split(raw, ",") {
token = strings.TrimSpace(token)
key, value, found := strings.Cut(token, "=")
if !found {
continue
}
params[strings.TrimSpace(key)] = strings.TrimSpace(value)
}
return params
}
func validateSigV4Input(now time.Time, cfg Config, input *sigV4Input) error {
if input == nil {
return fmt.Errorf("%w: empty signature input", ErrAuthorizationHeaderMalformed)
}
if !strings.EqualFold(input.Service, "s3") {
return fmt.Errorf("%w: unsupported service", ErrAuthorizationHeaderMalformed)
}
if !strings.EqualFold(input.Region, cfg.Region) {
return fmt.Errorf("%w: region mismatch", ErrAuthorizationHeaderMalformed)
}
requestTime, err := time.Parse("20060102T150405Z", input.AmzDate)
if err != nil {
return fmt.Errorf("%w: invalid x-amz-date", ErrAuthorizationHeaderMalformed)
}
delta := now.Sub(requestTime)
if delta > cfg.ClockSkew || delta < -cfg.ClockSkew {
return ErrRequestTimeTooSkewed
}
if input.Presigned {
if input.ExpiresSeconds > int(cfg.MaxPresignDuration.Seconds()) {
return fmt.Errorf("%w: presign expires too large", ErrInvalidPresign)
}
expiresAt := requestTime.Add(time.Duration(input.ExpiresSeconds) * time.Second)
if now.After(expiresAt) {
return ErrExpiredToken
}
}
return nil
}
func signatureMatches(secret string, r *http.Request, input *sigV4Input) (bool, error) {
payloadHash := resolvePayloadHash(r, input.Presigned)
canonicalRequest, err := buildCanonicalRequest(r, input.SignedHeaders, payloadHash, input.Presigned)
if err != nil {
return false, err
}
stringToSign := buildStringToSign(input.AmzDate, input.Scope, canonicalRequest)
signingKey := deriveSigningKey(secret, input.Date, input.Region, input.Service)
expectedSig := hex.EncodeToString(hmacSHA256(signingKey, stringToSign))
return hmac.Equal([]byte(expectedSig), []byte(input.SignatureHex)), nil
}
func resolvePayloadHash(r *http.Request, presigned bool) string {
if presigned {
return "UNSIGNED-PAYLOAD"
}
hash := strings.TrimSpace(r.Header.Get("x-amz-content-sha256"))
if hash == "" {
return "UNSIGNED-PAYLOAD"
}
return hash
}
func buildCanonicalRequest(r *http.Request, signedHeaders []string, payloadHash string, presigned bool) (string, error) {
canonicalURI := canonicalPath(r.URL)
canonicalQuery := canonicalQueryString(r.URL.RawQuery, presigned)
canonicalHeaders, signedHeadersRaw, err := canonicalHeadersForRequest(r, signedHeaders)
if err != nil {
return "", err
}
return strings.Join([]string{
r.Method,
canonicalURI,
canonicalQuery,
canonicalHeaders,
signedHeadersRaw,
payloadHash,
}, "\n"), nil
}
func canonicalPath(u *url.URL) string {
if u == nil {
return "/"
}
path := u.EscapedPath()
if path == "" {
return "/"
}
return path
}
type queryPair struct {
Key string
Value string
}
func canonicalQueryString(rawQuery string, presigned bool) string {
if rawQuery == "" {
return ""
}
values, _ := url.ParseQuery(rawQuery)
pairs := make([]queryPair, 0)
for key, valueList := range values {
if presigned && strings.EqualFold(key, "X-Amz-Signature") {
continue
}
if len(valueList) == 0 {
pairs = append(pairs, queryPair{Key: key, Value: ""})
continue
}
for _, value := range valueList {
pairs = append(pairs, queryPair{Key: key, Value: value})
}
}
sort.Slice(pairs, func(i, j int) bool {
if pairs[i].Key == pairs[j].Key {
return pairs[i].Value < pairs[j].Value
}
return pairs[i].Key < pairs[j].Key
})
encoded := make([]string, 0, len(pairs))
for _, pair := range pairs {
encoded = append(encoded, awsEncodeQuery(pair.Key)+"="+awsEncodeQuery(pair.Value))
}
return strings.Join(encoded, "&")
}
func awsEncodeQuery(value string) string {
encoded := url.QueryEscape(value)
encoded = strings.ReplaceAll(encoded, "+", "%20")
encoded = strings.ReplaceAll(encoded, "*", "%2A")
encoded = strings.ReplaceAll(encoded, "%7E", "~")
return encoded
}
func canonicalHeadersForRequest(r *http.Request, signedHeaders []string) (canonical string, signedRaw string, err error) {
if len(signedHeaders) == 0 {
return "", "", fmt.Errorf("%w: empty signed headers", ErrAuthorizationHeaderMalformed)
}
normalized := make([]string, 0, len(signedHeaders))
lines := make([]string, 0, len(signedHeaders))
for _, headerName := range signedHeaders {
headerName = strings.ToLower(strings.TrimSpace(headerName))
if headerName == "" {
continue
}
var value string
if headerName == "host" {
value = r.Host
} else {
values, ok := r.Header[http.CanonicalHeaderKey(headerName)]
if !ok || len(values) == 0 {
return "", "", fmt.Errorf("%w: missing signed header %q", ErrAuthorizationHeaderMalformed, headerName)
}
value = strings.Join(values, ",")
}
value = normalizeHeaderValue(value)
normalized = append(normalized, headerName)
lines = append(lines, headerName+":"+value)
}
if len(lines) == 0 {
return "", "", fmt.Errorf("%w: no valid signed headers", ErrAuthorizationHeaderMalformed)
}
signedRaw = strings.Join(normalized, ";")
canonical = strings.Join(lines, "\n") + "\n"
return canonical, signedRaw, nil
}
func normalizeHeaderValue(value string) string {
value = strings.TrimSpace(value)
parts := strings.Fields(value)
return strings.Join(parts, " ")
}
func buildStringToSign(amzDate string, scope string, canonicalRequest string) string {
canonicalHash := sha256.Sum256([]byte(canonicalRequest))
return strings.Join([]string{
sigV4Algorithm,
amzDate,
scope,
hex.EncodeToString(canonicalHash[:]),
}, "\n")
}
func deriveSigningKey(secret, date, region, service string) []byte {
kDate := hmacSHA256([]byte("AWS4"+secret), date)
kRegion := hmacSHA256(kDate, region)
kService := hmacSHA256(kRegion, service)
return hmacSHA256(kService, "aws4_request")
}
func hmacSHA256(key []byte, value string) []byte {
mac := hmac.New(sha256.New, key)
_, _ = mac.Write([]byte(value))
return mac.Sum(nil)
}

10
go.mod
View File

@@ -3,8 +3,12 @@ module fs
go 1.25.7 go 1.25.7
require ( require (
github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/go-chi/chi/v5 v5.2.5
github.com/klauspost/reedsolomon v1.13.2 // indirect github.com/google/uuid v1.6.0
go.etcd.io/bbolt v1.4.3 // indirect go.etcd.io/bbolt v1.4.3
)
require (
github.com/joho/godotenv v1.5.1 // indirect
golang.org/x/sys v0.41.0 // indirect golang.org/x/sys v0.41.0 // indirect
) )

22
go.sum
View File

@@ -1,10 +1,20 @@
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/reedsolomon v1.13.2 h1:9qtQy2tKEVpVB8Pfq87ZljHZb60/LbeTQ1OxV8EGzdE= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/klauspost/reedsolomon v1.13.2/go.mod h1:ggJT9lc71Vu+cSOPBlxGvBN6TfAS77qB4fp8vJ05NSA= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

207
logging/logging.go Normal file
View File

@@ -0,0 +1,207 @@
package logging
import (
"fs/metrics"
"log/slog"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
type Config struct {
Level slog.Level
LevelName string
Format string
Audit bool
AddSource bool
DebugMode bool
}
func ConfigFromEnv() Config {
levelName := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL")))
format := strings.ToLower(strings.TrimSpace(os.Getenv("LOG_FORMAT")))
return ConfigFromValues(levelName, format, envBool("AUDIT_LOG", true))
}
func ConfigFromValues(levelName, format string, audit bool) Config {
levelName = strings.ToLower(strings.TrimSpace(levelName))
if levelName == "" {
levelName = "info"
}
level := parseLevel(levelName)
levelName = strings.ToUpper(level.String())
format = strings.ToLower(strings.TrimSpace(format))
if format == "" {
format = "text"
}
if format != "json" && format != "text" {
format = "text"
}
debugMode := level <= slog.LevelDebug
return Config{
Level: level,
LevelName: levelName,
Format: format,
Audit: audit,
AddSource: debugMode,
DebugMode: debugMode,
}
}
func NewLogger(cfg Config) *slog.Logger {
opts := &slog.HandlerOptions{
Level: cfg.Level,
AddSource: cfg.AddSource,
}
opts.ReplaceAttr = func(_ []string, attr slog.Attr) slog.Attr {
if attr.Key == slog.SourceKey {
if src, ok := attr.Value.Any().(*slog.Source); ok && src != nil {
attr.Key = "src"
attr.Value = slog.StringValue(filepath.Base(src.File) + ":" + strconv.Itoa(src.Line))
}
}
return attr
}
var handler slog.Handler
if cfg.Format == "json" {
handler = slog.NewJSONHandler(os.Stdout, opts)
} else {
handler = slog.NewTextHandler(os.Stdout, opts)
}
logger := slog.New(handler)
slog.SetDefault(logger)
return logger
}
func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
op := metricOperationLabel(r)
metrics.Default.IncHTTPInFlightOp(op)
defer func() {
metrics.Default.DecHTTPInFlightOp(op)
}()
requestID := middleware.GetReqID(r.Context())
if requestID != "" {
ww.Header().Set("x-amz-request-id", requestID)
}
next.ServeHTTP(ww, r)
elapsed := time.Since(start)
status := ww.Status()
if status == 0 {
status = http.StatusOK
}
route := metricRouteLabel(r)
metrics.Default.ObserveHTTPRequestDetailed(r.Method, route, op, status, elapsed, ww.BytesWritten())
if !cfg.Audit && !cfg.DebugMode {
return
}
attrs := []any{
"method", r.Method,
"path", r.URL.Path,
"status", status,
"bytes", ww.BytesWritten(),
"duration_ms", float64(elapsed.Nanoseconds()) / 1_000_000.0,
"remote_addr", r.RemoteAddr,
}
if requestID != "" {
attrs = append(attrs, "request_id", requestID)
}
if cfg.DebugMode {
attrs = append(attrs,
"query", r.URL.RawQuery,
"user_agent", r.UserAgent(),
"content_length", r.ContentLength,
"content_type", r.Header.Get("Content-Type"),
"x_amz_sha256", r.Header.Get("x-amz-content-sha256"),
)
logger.Debug("http_request", attrs...)
return
}
logger.Info("http_request", attrs...)
})
}
}
func metricRouteLabel(r *http.Request) string {
if r == nil || r.URL == nil {
return "/unknown"
}
if routeCtx := chi.RouteContext(r.Context()); routeCtx != nil {
if pattern := strings.TrimSpace(routeCtx.RoutePattern()); pattern != "" {
return pattern
}
}
path := strings.TrimSpace(r.URL.Path)
if path == "" || path == "/" {
return "/"
}
if path == "/healthz" || path == "/metrics" {
return path
}
trimmed := strings.Trim(path, "/")
if trimmed == "" {
return "/"
}
if !strings.Contains(trimmed, "/") {
return "/{bucket}"
}
return "/{bucket}/*"
}
func metricOperationLabel(r *http.Request) string {
if r == nil {
return "other"
}
isDeletePost := false
if r.Method == http.MethodPost && r.URL != nil {
_, isDeletePost = r.URL.Query()["delete"]
}
return metrics.NormalizeHTTPOperation(r.Method, isDeletePost)
}
func envBool(key string, defaultValue bool) bool {
raw := os.Getenv(key)
if raw == "" {
return defaultValue
}
value, err := strconv.ParseBool(raw)
if err != nil {
return defaultValue
}
return value
}
func parseLevel(levelName string) slog.Level {
switch levelName {
case "debug":
return slog.LevelDebug
case "warn", "warning":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}

View File

@@ -0,0 +1,30 @@
package logging
import (
"net/http/httptest"
"testing"
)
func TestMetricRouteLabelFallbacks(t *testing.T) {
testCases := []struct {
name string
path string
want string
}{
{name: "root", path: "/", want: "/"},
{name: "health", path: "/healthz", want: "/healthz"},
{name: "metrics", path: "/metrics", want: "/metrics"},
{name: "bucket", path: "/some-bucket", want: "/{bucket}"},
{name: "object", path: "/some-bucket/private/path/file.jpg", want: "/{bucket}/*"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := httptest.NewRequest("GET", tc.path, nil)
got := metricRouteLabel(req)
if got != tc.want {
t.Fatalf("metricRouteLabel(%q) = %q, want %q", tc.path, got, tc.want)
}
})
}
}

99
main.go
View File

@@ -1,54 +1,89 @@
package main package main
import ( import (
"fmt" "context"
"fs/api"
"fs/auth"
"fs/logging"
"fs/metadata" "fs/metadata"
"fs/service" "fs/service"
"io" "fs/storage"
"fs/utils"
"os" "os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"
) )
func main() { func main() {
fmt.Println("Hello, World!") config := utils.NewConfig()
imageStream, err := os.Open("fer.jpg") logConfig := logging.ConfigFromValues(config.LogLevel, config.LogFormat, config.AuditLog)
if err != nil { authConfig := auth.ConfigFromValues(
fmt.Printf("Error opening image stream: %v\n", err) config.AuthEnabled,
return config.AuthRegion,
} config.AuthSkew,
defer imageStream.Close() config.AuthMaxPresign,
config.AuthMasterKey,
config.AuthBootstrapAccessKey,
config.AuthBootstrapSecretKey,
config.AuthBootstrapPolicy,
)
logger := logging.NewLogger(logConfig)
logger.Info("boot",
"log_level", logConfig.LevelName,
"log_format", logConfig.Format,
"audit_log", logConfig.Audit,
"data_path", config.DataPath,
"multipart_retention_hours", int(config.MultipartCleanupRetention/time.Hour),
"auth_enabled", authConfig.Enabled,
"auth_region", authConfig.Region,
)
metadataHandler, err := metadata.NewMetadataHandler("metadata.db") if err := os.MkdirAll(config.DataPath, 0o755); err != nil {
if err != nil { logger.Error("failed_to_prepare_data_path", "path", config.DataPath, "error", err)
fmt.Printf("Error initializing metadata handler: %v\n", err)
return return
} }
objectService := service.NewObjectService(metadataHandler) dbPath := filepath.Join(config.DataPath, "metadata.db")
metadataHandler, err := metadata.NewMetadataHandler(dbPath)
if err != nil {
logger.Error("failed_to_initialize_metadata_handler", "error", err)
return
}
blobHandler, err := storage.NewBlobStore(config.DataPath, config.ChunkSize)
if err != nil {
_ = metadataHandler.Close()
logger.Error("failed_to_initialize_blob_store", "error", err)
return
}
manifest, err := objectService.PutObject("test-bucket-ferdzo/fer.jpg", "image/jpeg", imageStream) objectService := service.NewObjectService(metadataHandler, blobHandler, config.MultipartCleanupRetention)
authService, err := auth.NewService(authConfig, metadataHandler)
if err != nil { if err != nil {
fmt.Printf("Error ingesting stream: %v\n", err) _ = metadataHandler.Close()
logger.Error("failed_to_initialize_auth_service", "error", err)
return
}
if err := authService.EnsureBootstrap(); err != nil {
_ = metadataHandler.Close()
logger.Error("failed_to_ensure_bootstrap_auth_identity", "error", err)
return return
} }
fmt.Printf("Manifest: %+v\n", manifest)
objectData, manifest2, err := objectService.GetObject("test-bucket-ferdzo", "fer.jpg") handler := api.NewHandler(objectService, logger, logConfig, authService)
if err != nil { addr := config.Address + ":" + strconv.Itoa(config.Port)
fmt.Printf("Error retrieving object: %v\n", err)
return
}
fmt.Printf("Retrieved manifest: %+v\n", manifest2)
recoveredFile, err := os.Create("recovered_" + manifest2.Key)
if err != nil {
fmt.Printf("Error creating file: %v\n", err)
return
}
defer recoveredFile.Close()
bytesWritten, err := io.Copy(recoveredFile, objectData) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
if err != nil { defer stop()
fmt.Printf("Error streaming to recovered file: %v\n", err) if config.GcEnabled {
go objectService.RunGC(ctx, config.GcInterval)
}
if err = handler.Start(ctx, addr); err != nil {
logger.Error("server_stopped_with_error", "error", err)
return return
} }
fmt.Printf("Successfully streamed %d bytes to disk!\n", bytesWritten)
} }

View File

@@ -2,37 +2,381 @@ package metadata
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"fs/metrics"
"fs/models" "fs/models"
"net"
"regexp"
"sort"
"strings"
"time"
"github.com/google/uuid"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
const ManifestBucketName = "object_manifests"
type MetadataHandler struct { type MetadataHandler struct {
db *bbolt.DB db *bbolt.DB
} }
var systemIndex = []byte("__SYSTEM_BUCKETS__")
var multipartUploadIndex = []byte("__MULTIPART_UPLOADS__")
var multipartUploadPartsIndex = []byte("__MULTIPART_UPLOAD_PARTS__")
var authIdentitiesIndex = []byte("__AUTH_IDENTITIES__")
var authPoliciesIndex = []byte("__AUTH_POLICIES__")
var validBucketName = regexp.MustCompile(`^[a-z0-9.-]+$`)
var (
ErrInvalidBucketName = errors.New("invalid bucket name")
ErrBucketAlreadyExists = errors.New("bucket already exists")
ErrBucketNotFound = errors.New("bucket not found")
ErrBucketNotEmpty = errors.New("bucket not empty")
ErrObjectNotFound = errors.New("object not found")
ErrMultipartNotFound = errors.New("multipart upload not found")
ErrMultipartNotPending = errors.New("multipart upload is not pending")
ErrAuthIdentityNotFound = errors.New("auth identity not found")
ErrAuthPolicyNotFound = errors.New("auth policy not found")
)
func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
db, err := bbolt.Open(dbPath, 0600, nil) db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 2 * time.Second})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &MetadataHandler{db: db}, nil h := &MetadataHandler{db: db}
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(systemIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(authIdentitiesIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(authPoliciesIndex)
return err
})
if err != nil {
_ = db.Close()
return nil, err
}
return h, nil
} }
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error { func isValidBucketName(bucketName string) bool {
err := h.db.Update(func(tx *bbolt.Tx) error { if len(bucketName) < 3 || len(bucketName) > 63 {
metadataBucket, err := tx.CreateBucketIfNotExists([]byte(ManifestBucketName)) return false
}
if !validBucketName.MatchString(bucketName) {
return false
}
if strings.Contains(bucketName, "..") {
return false
}
if bucketName[0] == '.' || bucketName[0] == '-' || bucketName[len(bucketName)-1] == '.' || bucketName[len(bucketName)-1] == '-' {
return false
}
for _, label := range strings.Split(bucketName, ".") {
if label == "" || label[0] == '-' || label[len(label)-1] == '-' {
return false
}
}
if ip := net.ParseIP(bucketName); ip != nil && ip.To4() != nil {
return false
}
return true
}
func (h *MetadataHandler) Close() error {
return h.db.Close()
}
func (h *MetadataHandler) view(fn func(tx *bbolt.Tx) error) error {
start := time.Now()
err := h.db.View(fn)
metrics.Default.ObserveMetadataTx("view", time.Since(start), err == nil)
return err
}
func (h *MetadataHandler) update(fn func(tx *bbolt.Tx) error) error {
start := time.Now()
err := h.db.Update(fn)
metrics.Default.ObserveMetadataTx("update", time.Since(start), err == nil)
return err
}
func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error {
if identity == nil {
return errors.New("auth identity is required")
}
if strings.TrimSpace(identity.AccessKeyID) == "" {
return errors.New("access key id is required")
}
return h.update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authIdentitiesIndex)
if bucket == nil {
return errors.New("auth identities index not found")
}
payload, err := json.Marshal(identity)
if err != nil { if err != nil {
return err return err
} }
key := fmt.Sprintf("%s/%s", manifest.Bucket, manifest.Key) return bucket.Put([]byte(identity.AccessKeyID), payload)
})
}
func (h *MetadataHandler) GetAuthIdentity(accessKeyID string) (*models.AuthIdentity, error) {
accessKeyID = strings.TrimSpace(accessKeyID)
if accessKeyID == "" {
return nil, errors.New("access key id is required")
}
var identity *models.AuthIdentity
err := h.view(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authIdentitiesIndex)
if bucket == nil {
return errors.New("auth identities index not found")
}
payload := bucket.Get([]byte(accessKeyID))
if payload == nil {
return fmt.Errorf("%w: %s", ErrAuthIdentityNotFound, accessKeyID)
}
record := models.AuthIdentity{}
if err := json.Unmarshal(payload, &record); err != nil {
return err
}
identity = &record
return nil
})
if err != nil {
return nil, err
}
return identity, nil
}
func (h *MetadataHandler) PutAuthPolicy(policy *models.AuthPolicy) error {
if policy == nil {
return errors.New("auth policy is required")
}
principal := strings.TrimSpace(policy.Principal)
if principal == "" {
return errors.New("auth policy principal is required")
}
policy.Principal = principal
return h.update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authPoliciesIndex)
if bucket == nil {
return errors.New("auth policies index not found")
}
payload, err := json.Marshal(policy)
if err != nil {
return err
}
return bucket.Put([]byte(principal), payload)
})
}
func (h *MetadataHandler) GetAuthPolicy(accessKeyID string) (*models.AuthPolicy, error) {
accessKeyID = strings.TrimSpace(accessKeyID)
if accessKeyID == "" {
return nil, errors.New("access key id is required")
}
var policy *models.AuthPolicy
err := h.view(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authPoliciesIndex)
if bucket == nil {
return errors.New("auth policies index not found")
}
payload := bucket.Get([]byte(accessKeyID))
if payload == nil {
return fmt.Errorf("%w: %s", ErrAuthPolicyNotFound, accessKeyID)
}
record := models.AuthPolicy{}
if err := json.Unmarshal(payload, &record); err != nil {
return err
}
policy = &record
return nil
})
if err != nil {
return nil, err
}
return policy, nil
}
func (h *MetadataHandler) CreateBucket(bucketName string) error {
if !isValidBucketName(bucketName) {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
}
err := h.update(func(tx *bbolt.Tx) error {
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
if err != nil {
return err
}
if indexBucket.Get([]byte(bucketName)) != nil {
return fmt.Errorf("%w: %s", ErrBucketAlreadyExists, bucketName)
}
_, err = tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
manifest := models.BucketManifest{
Name: bucketName,
CreatedAt: time.Now(),
}
data, _ := json.Marshal(manifest)
return indexBucket.Put([]byte(bucketName), data)
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) DeleteBucket(bucketName string) error {
if !isValidBucketName(bucketName) {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
}
err := h.update(func(tx *bbolt.Tx) error {
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
if err != nil {
return err
}
if indexBucket.Get([]byte(bucketName)) == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
}
metadataBucket := tx.Bucket([]byte(bucketName))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
}
if k, _ := metadataBucket.Cursor().First(); k != nil {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
multipartUploadsBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return err
}
cursor := multipartUploadsBucket.Cursor()
for _, payload := cursor.First(); payload != nil; _, payload = cursor.Next() {
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return err
}
if upload.Bucket == bucketName && upload.State == "pending" {
return fmt.Errorf("%w: %s", ErrBucketNotEmpty, bucketName)
}
}
if err := tx.DeleteBucket([]byte(bucketName)); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("error deleting metadata bucket %s: %w", bucketName, err)
}
if err := indexBucket.Delete([]byte(bucketName)); err != nil {
return fmt.Errorf("error deleting bucket %s from system index: %w", bucketName, err)
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) ListBuckets() ([]string, error) {
buckets := []string{}
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
c := systemIndexBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
buckets = append(buckets, string(k))
}
return nil
})
if err != nil {
return nil, err
}
return buckets, nil
}
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
var manifest *models.BucketManifest
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
data := systemIndexBucket.Get([]byte(bucketName))
if data == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucketName)
}
err := json.Unmarshal(data, &manifest)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return manifest, nil
}
func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
bucket := manifest.Bucket
key := manifest.Key
if _, err := h.GetBucketManifest(bucket); err != nil {
return err
}
err := h.update(func(tx *bbolt.Tx) error {
data, err := json.Marshal(manifest) data, err := json.Marshal(manifest)
if err != nil { if err != nil {
return err return err
} }
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
return metadataBucket.Put([]byte(key), data) return metadataBucket.Put([]byte(key), data)
}) })
if err != nil { if err != nil {
@@ -44,16 +388,15 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) { func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
var manifest *models.ObjectManifest var manifest *models.ObjectManifest
h.db.View(func(tx *bbolt.Tx) error { err := h.view(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(ManifestBucketName)) metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil { if metadataBucket == nil {
return fmt.Errorf("bucket %s not found", ManifestBucketName) return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
} }
key := fmt.Sprintf("%s/%s", bucket, key)
data := metadataBucket.Get([]byte(key)) data := metadataBucket.Get([]byte(key))
if data == nil { if data == nil {
return fmt.Errorf("manifest not found for bucket %s and key %s", bucket, key) return fmt.Errorf("%w: %s/%s", ErrObjectNotFound, bucket, key)
} }
err := json.Unmarshal(data, &manifest) err := json.Unmarshal(data, &manifest)
if err != nil { if err != nil {
@@ -61,6 +404,547 @@ func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifes
} }
return nil return nil
}) })
if err != nil {
return nil, err
}
return manifest, nil return manifest, nil
} }
func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
var objects []*models.ObjectManifest
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
if systemIndexBucket.Get([]byte(bucket)) == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
_bucket := tx.Bucket([]byte(bucket))
if _bucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
err := _bucket.ForEach(func(k, v []byte) error {
if prefix != "" && !strings.HasPrefix(string(k), prefix) {
return nil
}
object := models.ObjectManifest{}
err := json.Unmarshal(v, &object)
if err != nil {
return err
}
objects = append(objects, &object)
return nil
})
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return objects, nil
}
func (h *MetadataHandler) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
if fn == nil {
return errors.New("object callback is required")
}
return h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
if systemIndexBucket.Get([]byte(bucket)) == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
cursor := metadataBucket.Cursor()
var k, v []byte
if startKey == "" {
k, v = cursor.First()
} else {
k, v = cursor.Seek([]byte(startKey))
}
for ; k != nil; k, v = cursor.Next() {
object := models.ObjectManifest{}
if err := json.Unmarshal(v, &object); err != nil {
return err
}
if err := fn(&object); err != nil {
return err
}
}
return nil
})
}
func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
if _, err := h.GetManifest(bucket, key); err != nil {
return err
}
err := h.update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
return metadataBucket.Delete([]byte(key))
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
deleted := make([]string, 0, len(keys))
err := h.update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
}
for _, key := range keys {
if key == "" {
continue
}
if metadataBucket.Get([]byte(key)) != nil {
if err := metadataBucket.Delete([]byte(key)); err != nil {
return err
}
}
deleted = append(deleted, key)
}
return nil
})
if err != nil {
return nil, err
}
return deleted, nil
}
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
if systemIndexBucket.Get([]byte(bucket)) != nil {
return nil
}
return ErrBucketNotFound
})
if err != nil {
return nil, err
}
uploadId := uuid.New().String()
createdAt := time.Now().UTC().Format(time.RFC3339)
upload = &models.MultipartUpload{
Bucket: bucket,
Key: key,
UploadID: uploadId,
CreatedAt: createdAt,
State: "pending",
}
err = h.update(func(tx *bbolt.Tx) error {
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
if multipartUploadBucket == nil {
return errors.New("multipart upload index not found")
}
payload, err := json.Marshal(upload)
if err != nil {
return err
}
err = multipartUploadBucket.Put([]byte(uploadId), payload)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return upload, nil
}
func getMultipartUploadBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
multipartUploadBucket := tx.Bucket(multipartUploadIndex)
if multipartUploadBucket == nil {
return nil, errors.New("multipart upload index not found")
}
return multipartUploadBucket, nil
}
func getMultipartPartsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
multipartPartsBucket := tx.Bucket(multipartUploadPartsIndex)
if multipartPartsBucket == nil {
return nil, errors.New("multipart upload parts index not found")
}
return multipartPartsBucket, nil
}
func getMultipartUploadFromBucket(multipartUploadBucket *bbolt.Bucket, uploadID string) (*models.MultipartUpload, error) {
payload := multipartUploadBucket.Get([]byte(uploadID))
if payload == nil {
return nil, fmt.Errorf("%w: %s", ErrMultipartNotFound, uploadID)
}
upload := models.MultipartUpload{}
if err := json.Unmarshal(payload, &upload); err != nil {
return nil, err
}
return &upload, nil
}
func getMultipartUploadFromTx(tx *bbolt.Tx, uploadID string) (*models.MultipartUpload, *bbolt.Bucket, error) {
multipartUploadBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return nil, nil, err
}
upload, err := getMultipartUploadFromBucket(multipartUploadBucket, uploadID)
if err != nil {
return nil, nil, err
}
return upload, multipartUploadBucket, nil
}
func putMultipartUpload(multipartUploadBucket *bbolt.Bucket, uploadID string, upload *models.MultipartUpload) error {
payload, err := json.Marshal(upload)
if err != nil {
return err
}
return multipartUploadBucket.Put([]byte(uploadID), payload)
}
func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
prefix := uploadID + ":"
cursor := multipartPartsBucket.Cursor()
keysToDelete := make([][]byte, 0)
for k, _ := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, _ = cursor.Next() {
keyCopy := make([]byte, len(k))
copy(keyCopy, k)
keysToDelete = append(keysToDelete, keyCopy)
}
for _, key := range keysToDelete {
if err := multipartPartsBucket.Delete(key); err != nil {
return err
}
}
return nil
}
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.view(func(tx *bbolt.Tx) error {
var err error
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return upload, nil
}
func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.UploadedPart) error {
if part.PartNumber < 1 || part.PartNumber > 10000 {
return fmt.Errorf("invalid part number: %d", part.PartNumber)
}
err := h.update(func(tx *bbolt.Tx) error {
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State != "pending" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
key := fmt.Sprintf("%s:%05d", uploadID, part.PartNumber)
payload, err := json.Marshal(part)
if err != nil {
return err
}
return multipartPartsBucket.Put([]byte(key), payload)
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
parts := make([]models.UploadedPart, 0)
err := h.view(func(tx *bbolt.Tx) error {
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
return err
}
multipartPartsBucket, err := getMultipartPartsBucket(tx)
if err != nil {
return err
}
prefix := uploadID + ":"
cursor := multipartPartsBucket.Cursor()
for k, v := cursor.Seek([]byte(prefix)); k != nil && strings.HasPrefix(string(k), prefix); k, v = cursor.Next() {
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
parts = append(parts, part)
}
return nil
})
if err != nil {
return nil, err
}
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
return parts, nil
}
func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models.ObjectManifest) error {
if final == nil {
return errors.New("final object manifest is required")
}
err := h.update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State != "pending" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
metadataBucket := tx.Bucket([]byte(upload.Bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, upload.Bucket)
}
final.Bucket = upload.Bucket
final.Key = upload.Key
finalPayload, err := json.Marshal(final)
if err != nil {
return err
}
if err := metadataBucket.Put([]byte(upload.Key), finalPayload); err != nil {
return err
}
upload.State = "completed"
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
err := h.update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
}
if upload.State == "completed" {
return fmt.Errorf("%w: %s", ErrMultipartNotPending, uploadID)
}
upload.State = "aborted"
if err := putMultipartUpload(multipartUploadBucket, uploadID, upload); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int, error) {
if retention <= 0 {
return 0, nil
}
cleaned := 0
err := h.update(func(tx *bbolt.Tx) error {
uploadsBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return err
}
now := time.Now().UTC()
keysToDelete := make([]string, 0)
if err := uploadsBucket.ForEach(func(k, v []byte) error {
upload := models.MultipartUpload{}
if err := json.Unmarshal(v, &upload); err != nil {
return err
}
if upload.State == "pending" {
return nil
}
createdAt, err := time.Parse(time.RFC3339, upload.CreatedAt)
if err != nil {
return nil
}
if now.Sub(createdAt) >= retention {
keysToDelete = append(keysToDelete, string(k))
}
return nil
}); err != nil {
return err
}
for _, uploadID := range keysToDelete {
if err := uploadsBucket.Delete([]byte(uploadID)); err != nil {
return err
}
if err := deleteMultipartPartsByUploadID(tx, uploadID); err != nil {
return err
}
cleaned++
}
return nil
})
if err != nil {
return 0, err
}
return cleaned, nil
}
func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
chunkSet := make(map[string]struct{})
pendingUploadSet := make(map[string]struct{})
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
}
c := systemIndexBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
metadataBucket := tx.Bucket(k)
if metadataBucket == nil {
continue
}
err := metadataBucket.ForEach(func(k, v []byte) error {
object := models.ObjectManifest{}
err := json.Unmarshal(v, &object)
if err != nil {
return err
}
for _, chunkID := range object.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
})
if err != nil {
return err
}
}
uploadsBucket := tx.Bucket(multipartUploadIndex)
if uploadsBucket == nil {
return errors.New("multipart upload index not found")
}
if err := uploadsBucket.ForEach(func(k, v []byte) error {
upload := models.MultipartUpload{}
if err := json.Unmarshal(v, &upload); err != nil {
return err
}
if upload.State == "pending" {
pendingUploadSet[string(k)] = struct{}{}
}
return nil
}); err != nil {
return err
}
partsBucket := tx.Bucket(multipartUploadPartsIndex)
if partsBucket == nil {
return errors.New("multipart upload parts index not found")
}
if err := partsBucket.ForEach(func(k, v []byte) error {
uploadID, _, ok := strings.Cut(string(k), ":")
if !ok {
return nil
}
if _, pending := pendingUploadSet[uploadID]; !pending {
return nil
}
part := models.UploadedPart{}
if err := json.Unmarshal(v, &part); err != nil {
return err
}
for _, chunkID := range part.Chunks {
chunkSet[chunkID] = struct{}{}
}
return nil
}); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return chunkSet, nil
}
func (h *MetadataHandler) GetReferencedChunks() ([]string, error) {
chunkSet, err := h.GetReferencedChunkSet()
if err != nil {
return nil, err
}
chunks := make([]string, 0, len(chunkSet))
for chunkID := range chunkSet {
chunks = append(chunks, chunkID)
}
return chunks, nil
}

795
metrics/metrics.go Normal file
View File

@@ -0,0 +1,795 @@
package metrics
import (
"fmt"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
var defaultBuckets = []float64{
0.0005, 0.001, 0.0025, 0.005, 0.01,
0.025, 0.05, 0.1, 0.25, 0.5,
1, 2.5, 5, 10,
}
var lockBuckets = []float64{
0.000001, 0.000005, 0.00001, 0.00005,
0.0001, 0.0005, 0.001, 0.005, 0.01,
0.025, 0.05, 0.1, 0.25, 0.5, 1,
}
var batchBuckets = []float64{1, 2, 4, 8, 16, 32, 64, 100, 128, 256, 512, 1000, 5000}
var Default = NewRegistry()
type histogram struct {
bounds []float64
counts []uint64
sum float64
count uint64
}
func newHistogram(bounds []float64) *histogram {
cloned := make([]float64, len(bounds))
copy(cloned, bounds)
return &histogram{
bounds: cloned,
counts: make([]uint64, len(bounds)+1),
}
}
func (h *histogram) observe(v float64) {
h.count++
h.sum += v
for i, bound := range h.bounds {
if v <= bound {
h.counts[i]++
return
}
}
h.counts[len(h.counts)-1]++
}
func (h *histogram) snapshot() (bounds []float64, counts []uint64, sum float64, count uint64) {
bounds = make([]float64, len(h.bounds))
copy(bounds, h.bounds)
counts = make([]uint64, len(h.counts))
copy(counts, h.counts)
return bounds, counts, h.sum, h.count
}
type Registry struct {
startedAt time.Time
httpInFlight atomic.Int64
connectionPoolActive atomic.Int64
connectionPoolMax atomic.Int64
connectionPoolWaits atomic.Uint64
requestQueueLength atomic.Int64
mu sync.Mutex
httpRequestsRoute map[string]uint64
httpResponseBytesRoute map[string]uint64
httpDurationRoute map[string]*histogram
httpRequestsOp map[string]uint64
httpDurationOp map[string]*histogram
httpInFlightOp map[string]int64
authRequests map[string]uint64
serviceOps map[string]uint64
serviceDuration map[string]*histogram
dbTxTotal map[string]uint64
dbTxDuration map[string]*histogram
blobOps map[string]uint64
blobBytes map[string]uint64
blobDuration map[string]*histogram
lockWait map[string]*histogram
lockHold map[string]*histogram
cacheHits map[string]uint64
cacheMisses map[string]uint64
batchSize *histogram
retries map[string]uint64
errors map[string]uint64
gcRuns map[string]uint64
gcDuration *histogram
gcDeletedChunks uint64
gcDeleteErrors uint64
gcCleanedUpload uint64
}
func NewRegistry() *Registry {
return &Registry{
startedAt: time.Now(),
httpRequestsRoute: make(map[string]uint64),
httpResponseBytesRoute: make(map[string]uint64),
httpDurationRoute: make(map[string]*histogram),
httpRequestsOp: make(map[string]uint64),
httpDurationOp: make(map[string]*histogram),
httpInFlightOp: make(map[string]int64),
authRequests: make(map[string]uint64),
serviceOps: make(map[string]uint64),
serviceDuration: make(map[string]*histogram),
dbTxTotal: make(map[string]uint64),
dbTxDuration: make(map[string]*histogram),
blobOps: make(map[string]uint64),
blobBytes: make(map[string]uint64),
blobDuration: make(map[string]*histogram),
lockWait: make(map[string]*histogram),
lockHold: make(map[string]*histogram),
cacheHits: make(map[string]uint64),
cacheMisses: make(map[string]uint64),
batchSize: newHistogram(batchBuckets),
retries: make(map[string]uint64),
errors: make(map[string]uint64),
gcRuns: make(map[string]uint64),
gcDuration: newHistogram(defaultBuckets),
}
}
func NormalizeHTTPOperation(method string, isDeletePost bool) string {
switch strings.ToUpper(strings.TrimSpace(method)) {
case "GET":
return "get"
case "PUT":
return "put"
case "DELETE":
return "delete"
case "HEAD":
return "head"
case "POST":
if isDeletePost {
return "delete"
}
return "put"
default:
return "other"
}
}
func statusResult(status int) string {
if status >= 200 && status < 400 {
return "ok"
}
return "error"
}
func normalizeRoute(route string) string {
route = strings.TrimSpace(route)
if route == "" {
return "/unknown"
}
return route
}
func normalizeOp(op string) string {
op = strings.ToLower(strings.TrimSpace(op))
if op == "" {
return "other"
}
return op
}
func (r *Registry) IncHTTPInFlight() {
r.httpInFlight.Add(1)
}
func (r *Registry) DecHTTPInFlight() {
r.httpInFlight.Add(-1)
}
func (r *Registry) IncHTTPInFlightOp(op string) {
r.httpInFlight.Add(1)
op = normalizeOp(op)
r.mu.Lock()
r.httpInFlightOp[op]++
r.mu.Unlock()
}
func (r *Registry) DecHTTPInFlightOp(op string) {
r.httpInFlight.Add(-1)
op = normalizeOp(op)
r.mu.Lock()
r.httpInFlightOp[op]--
if r.httpInFlightOp[op] < 0 {
r.httpInFlightOp[op] = 0
}
r.mu.Unlock()
}
func (r *Registry) ObserveHTTPRequest(method, route string, status int, d time.Duration, responseBytes int) {
op := NormalizeHTTPOperation(method, false)
r.ObserveHTTPRequestDetailed(method, route, op, status, d, responseBytes)
}
func (r *Registry) ObserveHTTPRequestDetailed(method, route, op string, status int, d time.Duration, responseBytes int) {
route = normalizeRoute(route)
op = normalizeOp(op)
result := statusResult(status)
routeKey := method + "|" + route + "|" + strconv.Itoa(status)
routeDurKey := method + "|" + route
opKey := op + "|" + result
r.mu.Lock()
r.httpRequestsRoute[routeKey]++
if responseBytes > 0 {
r.httpResponseBytesRoute[routeKey] += uint64(responseBytes)
}
hRoute := r.httpDurationRoute[routeDurKey]
if hRoute == nil {
hRoute = newHistogram(defaultBuckets)
r.httpDurationRoute[routeDurKey] = hRoute
}
hRoute.observe(d.Seconds())
r.httpRequestsOp[opKey]++
hOp := r.httpDurationOp[opKey]
if hOp == nil {
hOp = newHistogram(defaultBuckets)
r.httpDurationOp[opKey] = hOp
}
hOp.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveAuth(result, authType, reason string) {
authType = strings.TrimSpace(authType)
if authType == "" {
authType = "unknown"
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "none"
}
key := result + "|" + authType + "|" + reason
r.mu.Lock()
r.authRequests[key]++
r.mu.Unlock()
}
func (r *Registry) ObserveService(operation string, d time.Duration, ok bool) {
result := "error"
if ok {
result = "ok"
}
key := operation + "|" + result
r.mu.Lock()
r.serviceOps[key]++
h := r.serviceDuration[operation]
if h == nil {
h = newHistogram(defaultBuckets)
r.serviceDuration[operation] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveMetadataTx(txType string, d time.Duration, ok bool) {
result := "error"
if ok {
result = "ok"
}
key := txType + "|" + result
r.mu.Lock()
r.dbTxTotal[key]++
h := r.dbTxDuration[txType]
if h == nil {
h = newHistogram(defaultBuckets)
r.dbTxDuration[txType] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool, backend ...string) {
be := "disk"
if len(backend) > 0 {
candidate := strings.TrimSpace(backend[0])
if candidate != "" {
be = strings.ToLower(candidate)
}
}
result := "error"
if ok {
result = "ok"
}
op := strings.ToLower(strings.TrimSpace(operation))
if op == "" {
op = "unknown"
}
histKey := op + "|" + be + "|" + result
opsKey := histKey
r.mu.Lock()
r.blobOps[opsKey]++
h := r.blobDuration[histKey]
if h == nil {
h = newHistogram(defaultBuckets)
r.blobDuration[histKey] = h
}
h.observe(d.Seconds())
if bytes > 0 {
r.blobBytes[op] += uint64(bytes)
}
r.mu.Unlock()
}
func (r *Registry) SetConnectionPoolMax(max int) {
if max < 0 {
max = 0
}
r.connectionPoolMax.Store(int64(max))
}
func (r *Registry) IncConnectionPoolActive() {
r.connectionPoolActive.Add(1)
}
func (r *Registry) DecConnectionPoolActive() {
r.connectionPoolActive.Add(-1)
}
func (r *Registry) IncConnectionPoolWait() {
r.connectionPoolWaits.Add(1)
}
func (r *Registry) IncRequestQueueLength() {
r.requestQueueLength.Add(1)
}
func (r *Registry) DecRequestQueueLength() {
r.requestQueueLength.Add(-1)
}
func (r *Registry) ObserveLockWait(lockName string, d time.Duration) {
lockName = strings.TrimSpace(lockName)
if lockName == "" {
lockName = "unknown"
}
r.mu.Lock()
h := r.lockWait[lockName]
if h == nil {
h = newHistogram(lockBuckets)
r.lockWait[lockName] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveLockHold(lockName string, d time.Duration) {
lockName = strings.TrimSpace(lockName)
if lockName == "" {
lockName = "unknown"
}
r.mu.Lock()
h := r.lockHold[lockName]
if h == nil {
h = newHistogram(lockBuckets)
r.lockHold[lockName] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveCacheHit(cache string) {
cache = strings.TrimSpace(cache)
if cache == "" {
cache = "unknown"
}
r.mu.Lock()
r.cacheHits[cache]++
r.mu.Unlock()
}
func (r *Registry) ObserveCacheMiss(cache string) {
cache = strings.TrimSpace(cache)
if cache == "" {
cache = "unknown"
}
r.mu.Lock()
r.cacheMisses[cache]++
r.mu.Unlock()
}
func (r *Registry) ObserveBatchSize(size int) {
if size < 0 {
size = 0
}
r.mu.Lock()
r.batchSize.observe(float64(size))
r.mu.Unlock()
}
func (r *Registry) ObserveRetry(op, reason string) {
op = normalizeOp(op)
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "unknown"
}
key := op + "|" + reason
r.mu.Lock()
r.retries[key]++
r.mu.Unlock()
}
func (r *Registry) ObserveError(op, reason string) {
op = normalizeOp(op)
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "unknown"
}
key := op + "|" + reason
r.mu.Lock()
r.errors[key]++
r.mu.Unlock()
}
func (r *Registry) ObserveGC(d time.Duration, deletedChunks, deleteErrors, cleanedUploads int, ok bool) {
result := "error"
if ok {
result = "ok"
}
r.mu.Lock()
r.gcRuns[result]++
r.gcDuration.observe(d.Seconds())
if deletedChunks > 0 {
r.gcDeletedChunks += uint64(deletedChunks)
}
if deleteErrors > 0 {
r.gcDeleteErrors += uint64(deleteErrors)
}
if cleanedUploads > 0 {
r.gcCleanedUpload += uint64(cleanedUploads)
}
r.mu.Unlock()
}
func (r *Registry) RenderPrometheus() string {
now := time.Now()
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
r.mu.Lock()
httpReqRoute := copyCounterMap(r.httpRequestsRoute)
httpRespRoute := copyCounterMap(r.httpResponseBytesRoute)
httpDurRoute := copyHistogramMap(r.httpDurationRoute)
httpReqOp := copyCounterMap(r.httpRequestsOp)
httpDurOp := copyHistogramMap(r.httpDurationOp)
httpInFlightOp := copyIntGaugeMap(r.httpInFlightOp)
authReq := copyCounterMap(r.authRequests)
serviceOps := copyCounterMap(r.serviceOps)
serviceDur := copyHistogramMap(r.serviceDuration)
dbTx := copyCounterMap(r.dbTxTotal)
dbTxDur := copyHistogramMap(r.dbTxDuration)
blobOps := copyCounterMap(r.blobOps)
blobBytes := copyCounterMap(r.blobBytes)
blobDur := copyHistogramMap(r.blobDuration)
lockWait := copyHistogramMap(r.lockWait)
lockHold := copyHistogramMap(r.lockHold)
cacheHits := copyCounterMap(r.cacheHits)
cacheMisses := copyCounterMap(r.cacheMisses)
batchBounds, batchCounts, batchSum, batchCount := r.batchSize.snapshot()
retries := copyCounterMap(r.retries)
errorsTotal := copyCounterMap(r.errors)
gcRuns := copyCounterMap(r.gcRuns)
gcDurBounds, gcDurCounts, gcDurSum, gcDurCount := r.gcDuration.snapshot()
gcDeletedChunks := r.gcDeletedChunks
gcDeleteErrors := r.gcDeleteErrors
gcCleanedUploads := r.gcCleanedUpload
r.mu.Unlock()
connectionActive := float64(r.connectionPoolActive.Load())
connectionMax := float64(r.connectionPoolMax.Load())
connectionWaits := r.connectionPoolWaits.Load()
queueLength := float64(r.requestQueueLength.Load())
resident, hasResident := readResidentMemoryBytes()
cpuSeconds, hasCPU := readProcessCPUSeconds()
var b strings.Builder
httpInFlightOp["all"] = r.httpInFlight.Load()
writeGaugeVecFromInt64(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests by operation.", httpInFlightOp, "op")
writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests by operation and result.", httpReqOp, []string{"op", "result"})
writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency by operation and result.", httpDurOp, []string{"op", "result"})
writeCounterVecKV(&b, "fs_http_requests_by_route_total", "Total HTTP requests by method/route/status.", httpReqRoute, []string{"method", "route", "status"})
writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpRespRoute, []string{"method", "route", "status"})
writeHistogramVecKV(&b, "fs_http_request_duration_by_route_seconds", "HTTP request latency by method/route.", httpDurRoute, []string{"method", "route"})
writeCounterVecKV(&b, "fs_auth_requests_total", "Authentication attempts by result.", authReq, []string{"result", "auth_type", "reason"})
writeCounterVecKV(&b, "fs_service_operations_total", "Service-level operation calls.", serviceOps, []string{"operation", "result"})
writeHistogramVecKV(&b, "fs_service_operation_duration_seconds", "Service-level operation latency.", serviceDur, []string{"operation"})
writeCounterVecKV(&b, "fs_metadata_tx_total", "Metadata transaction calls.", dbTx, []string{"type", "result"})
writeHistogramVecKV(&b, "fs_metadata_tx_duration_seconds", "Metadata transaction latency.", dbTxDur, []string{"type"})
writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob backend operation latency.", blobDur, []string{"op", "backend", "result"})
writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"op", "backend", "result"})
writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed by operation.", blobBytes, []string{"op"})
writeGauge(&b, "fs_connection_pool_active", "Active pooled connections.", connectionActive)
writeGauge(&b, "fs_connection_pool_max", "Maximum pooled connections.", connectionMax)
writeCounter(&b, "fs_connection_pool_waits_total", "Number of waits due to pool saturation.", connectionWaits)
writeGauge(&b, "fs_request_queue_length", "Requests waiting for an execution slot.", queueLength)
writeHistogramVecKV(&b, "fs_lock_wait_seconds", "Time spent waiting for locks.", lockWait, []string{"lock_name"})
writeHistogramVecKV(&b, "fs_lock_hold_seconds", "Time locks were held.", lockHold, []string{"lock_name"})
writeCounterVecKV(&b, "fs_cache_hits_total", "Cache hits by cache name.", cacheHits, []string{"cache"})
writeCounterVecKV(&b, "fs_cache_misses_total", "Cache misses by cache name.", cacheMisses, []string{"cache"})
writeHistogram(&b, "fs_batch_size_histogram", "Observed batch sizes.", nil, batchBounds, batchCounts, batchSum, batchCount)
writeCounterVecKV(&b, "fs_retries_total", "Retries by operation and reason.", retries, []string{"op", "reason"})
writeCounterVecKV(&b, "fs_errors_total", "Errors by operation and reason.", errorsTotal, []string{"op", "reason"})
writeCounterVecKV(&b, "fs_gc_runs_total", "Garbage collection runs.", gcRuns, []string{"result"})
writeHistogram(&b, "fs_gc_duration_seconds", "Garbage collection runtime.", nil, gcDurBounds, gcDurCounts, gcDurSum, gcDurCount)
writeCounter(&b, "fs_gc_deleted_chunks_total", "Deleted chunks during GC.", gcDeletedChunks)
writeCounter(&b, "fs_gc_delete_errors_total", "Chunk delete errors during GC.", gcDeleteErrors)
writeCounter(&b, "fs_gc_cleaned_uploads_total", "Cleaned multipart uploads during GC.", gcCleanedUploads)
writeGauge(&b, "fs_uptime_seconds", "Process uptime in seconds.", now.Sub(r.startedAt).Seconds())
writeGauge(&b, "fs_runtime_goroutines", "Number of goroutines.", float64(runtime.NumGoroutine()))
writeGaugeVec(&b, "fs_runtime_memory_bytes", "Runtime memory in bytes.", map[string]float64{
"alloc": float64(mem.Alloc),
"total": float64(mem.TotalAlloc),
"sys": float64(mem.Sys),
"heap_alloc": float64(mem.HeapAlloc),
"heap_sys": float64(mem.HeapSys),
"stack_sys": float64(mem.StackSys),
}, "type")
writeCounter(&b, "fs_runtime_gc_cycles_total", "Completed GC cycles.", uint64(mem.NumGC))
writeCounterFloat(&b, "fs_runtime_gc_pause_seconds_total", "Total GC pause time in seconds.", float64(mem.PauseTotalNs)/1e9)
if hasCPU {
writeCounterFloat(&b, "process_cpu_seconds_total", "Total user and system CPU time spent in seconds.", cpuSeconds)
}
if hasResident {
writeGauge(&b, "process_resident_memory_bytes", "Resident memory size in bytes.", resident)
}
return b.String()
}
type histogramSnapshot struct {
bounds []float64
counts []uint64
sum float64
count uint64
}
func copyCounterMap(src map[string]uint64) map[string]uint64 {
out := make(map[string]uint64, len(src))
for k, v := range src {
out[k] = v
}
return out
}
func copyIntGaugeMap(src map[string]int64) map[string]int64 {
out := make(map[string]int64, len(src))
for k, v := range src {
out[k] = v
}
return out
}
func copyHistogramMap(src map[string]*histogram) map[string]histogramSnapshot {
out := make(map[string]histogramSnapshot, len(src))
for k, h := range src {
bounds, counts, sum, count := h.snapshot()
out[k] = histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count}
}
return out
}
func writeCounter(b *strings.Builder, name, help string, value uint64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
fmt.Fprintf(b, "%s %d\n", name, value)
}
func writeCounterFloat(b *strings.Builder, name, help string, value float64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
fmt.Fprintf(b, "%s %.9f\n", name, value)
}
func writeGauge(b *strings.Builder, name, help string, value float64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
fmt.Fprintf(b, "%s %.9f\n", name, value)
}
func writeGaugeVec(b *strings.Builder, name, help string, values map[string]float64, labelName string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), values[key])
}
}
func writeGaugeVecFromInt64(b *strings.Builder, name, help string, values map[string]int64, labelName string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), float64(values[key]))
}
}
func writeCounterVecKV(b *strings.Builder, name, help string, values map[string]uint64, labels []string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
parts := strings.Split(key, "|")
fmt.Fprintf(b, "%s{%s} %d\n", name, formatLabels(labels, parts), values[key])
}
}
func writeHistogramVecKV(b *strings.Builder, name, help string, values map[string]histogramSnapshot, labels []string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
parts := strings.Split(key, "|")
labelsMap := make(map[string]string, len(labels))
for i, label := range labels {
if i < len(parts) {
labelsMap[label] = parts[i]
} else {
labelsMap[label] = ""
}
}
writeHistogramWithLabelsMap(b, name, labelsMap, values[key])
}
}
func writeHistogram(b *strings.Builder, name, help string, labels map[string]string, bounds []float64, counts []uint64, sum float64, count uint64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count})
}
func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[string]string, s histogramSnapshot) {
var cumulative uint64
for i, bucketCount := range s.counts {
cumulative += bucketCount
bucketLabels := cloneLabels(labels)
if i < len(s.bounds) {
bucketLabels["le"] = trimFloat(s.bounds[i])
} else {
bucketLabels["le"] = "+Inf"
}
fmt.Fprintf(b, "%s_bucket{%s} %d\n", name, labelsToString(bucketLabels), cumulative)
}
labelsSuffix := formatLabelsSuffix(labels)
fmt.Fprintf(b, "%s_sum%s %.9f\n", name, labelsSuffix, s.sum)
fmt.Fprintf(b, "%s_count%s %d\n", name, labelsSuffix, s.count)
}
func formatLabelsSuffix(labels map[string]string) string {
if len(labels) == 0 {
return ""
}
return "{" + labelsToString(labels) + "}"
}
func formatLabels(keys, values []string) string {
parts := make([]string, 0, len(keys))
for i, key := range keys {
value := ""
if i < len(values) {
value = values[i]
}
parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(value)))
}
return strings.Join(parts, ",")
}
func labelsToString(labels map[string]string) string {
if len(labels) == 0 {
return ""
}
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, key := range keys {
parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(labels[key])))
}
return strings.Join(parts, ",")
}
func cloneLabels(in map[string]string) map[string]string {
if len(in) == 0 {
return map[string]string{}
}
out := make(map[string]string, len(in)+1)
for k, v := range in {
out[k] = v
}
return out
}
func trimFloat(v float64) string {
return strconv.FormatFloat(v, 'f', -1, 64)
}
func escapeLabelValue(value string) string {
value = strings.ReplaceAll(value, `\`, `\\`)
value = strings.ReplaceAll(value, "\n", `\\n`)
value = strings.ReplaceAll(value, `"`, `\\"`)
return value
}
func readResidentMemoryBytes() (float64, bool) {
data, err := os.ReadFile("/proc/self/statm")
if err != nil {
return 0, false
}
fields := strings.Fields(string(data))
if len(fields) < 2 {
return 0, false
}
rssPages, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil || rssPages < 0 {
return 0, false
}
return float64(rssPages * int64(os.Getpagesize())), true
}
func readProcessCPUSeconds() (float64, bool) {
var usage syscall.Rusage
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil {
return 0, false
}
user := float64(usage.Utime.Sec) + float64(usage.Utime.Usec)/1e6
sys := float64(usage.Stime.Sec) + float64(usage.Stime.Usec)/1e6
return user + sys, true
}

34
metrics/metrics_test.go Normal file
View File

@@ -0,0 +1,34 @@
package metrics
import (
"strings"
"testing"
)
func TestRenderPrometheusHistogramNoEmptyLabelSet(t *testing.T) {
reg := NewRegistry()
reg.ObserveBatchSize(3)
reg.ObserveGC(0, 0, 0, 0, true)
out := reg.RenderPrometheus()
if strings.Contains(out, "fs_batch_size_histogram_sum{}") {
t.Fatalf("unexpected empty label set for batch sum metric")
}
if strings.Contains(out, "fs_batch_size_histogram_count{}") {
t.Fatalf("unexpected empty label set for batch count metric")
}
if strings.Contains(out, "fs_gc_duration_seconds_sum{}") {
t.Fatalf("unexpected empty label set for gc sum metric")
}
if strings.Contains(out, "fs_gc_duration_seconds_count{}") {
t.Fatalf("unexpected empty label set for gc count metric")
}
}
func TestEscapeLabelValueEscapesSingleBackslash(t *testing.T) {
got := escapeLabelValue(`a\b`)
want := `a\\b`
if got != want {
t.Fatalf("escapeLabelValue returned %q, want %q", got, want)
}
}

View File

@@ -1,5 +1,10 @@
package models package models
import (
"encoding/xml"
"time"
)
type ObjectManifest struct { type ObjectManifest struct {
Bucket string `json:"bucket"` Bucket string `json:"bucket"`
Key string `json:"key"` Key string `json:"key"`
@@ -9,3 +14,212 @@ type ObjectManifest struct {
Chunks []string `json:"chunks"` Chunks []string `json:"chunks"`
CreatedAt int64 `json:"created_at"` CreatedAt int64 `json:"created_at"`
} }
type BucketManifest struct {
Name string `json:"name"`
CreatedAt time.Time `json:"created_at"`
OwnerID string `json:"owner_id"`
OwnerDisplayName string `json:"owner_display_name"`
Region string `json:"region"`
VersioningStatus string `json:"versioning_status"`
PublicAccessBlock bool `json:"public_access_block"`
}
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
Xmlns string `xml:"xmlns,attr"`
Owner BucketsOwner `xml:"Owner"`
Buckets BucketsElement `xml:"Buckets"`
}
type BucketsOwner struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName,omitempty"`
}
type BucketsElement struct {
Items []BucketItem `xml:"Bucket"`
}
type BucketItem struct {
Name string `xml:"Name"`
CreationDate string `xml:"CreationDate"`
}
type S3ErrorResponse struct {
XMLName xml.Name `xml:"Error"`
Code string `xml:"Code"`
Message string `xml:"Message"`
Resource string `xml:"Resource,omitempty"`
RequestID string `xml:"RequestId,omitempty"`
HostID string `xml:"HostId,omitempty"`
}
type ListBucketResult struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
KeyCount int `xml:"KeyCount"`
MaxKeys int `xml:"MaxKeys"`
IsTruncated bool `xml:"IsTruncated"`
Contents []Contents `xml:"Contents"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type ListBucketResultV1 struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Marker string `xml:"Marker,omitempty"`
NextMarker string `xml:"NextMarker,omitempty"`
Delimiter string `xml:"Delimiter,omitempty"`
MaxKeys int `xml:"MaxKeys"`
IsTruncated bool `xml:"IsTruncated"`
EncodingType string `xml:"EncodingType,omitempty"`
Contents []Contents `xml:"Contents,omitempty"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type ListBucketResultV2 struct {
XMLName xml.Name `xml:"ListBucketResult"`
Xmlns string `xml:"xmlns,attr"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Delimiter string `xml:"Delimiter,omitempty"`
MaxKeys int `xml:"MaxKeys"`
KeyCount int `xml:"KeyCount"`
IsTruncated bool `xml:"IsTruncated"`
ContinuationToken string `xml:"ContinuationToken,omitempty"`
NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
StartAfter string `xml:"StartAfter,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
Contents []Contents `xml:"Contents,omitempty"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes,omitempty"`
}
type Contents struct {
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
StorageClass string `xml:"StorageClass"`
}
type CommonPrefixes struct {
Prefix string `xml:"Prefix"`
}
type MultipartUpload struct {
UploadID string `json:"upload_id" xml:"UploadId"`
Bucket string `json:"bucket" xml:"Bucket"`
Key string `json:"key" xml:"Key"`
CreatedAt string `json:"created_at" xml:"CreatedAt"`
State string `json:"state" xml:"State"`
}
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
}
type UploadedPart struct {
PartNumber int `json:"part_number" xml:"PartNumber"`
ETag string `json:"etag" xml:"ETag"`
Size int64 `json:"size" xml:"Size"`
Chunks []string `json:"chunks"`
CreatedAt int64 `json:"created_at"`
}
type CompletedPart struct {
PartNumber int `xml:"PartNumber"`
ETag string `xml:"ETag"`
}
type CompleteMultipartUploadRequest struct {
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []CompletedPart `xml:"Part"`
}
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
ETag string `xml:"ETag"`
Location string `xml:"Location,omitempty"`
}
type ListPartsResult struct {
XMLName xml.Name `xml:"ListPartsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
Parts []PartItem `xml:"Part"`
}
type PartItem struct {
PartNumber int `xml:"PartNumber"`
LastModified string `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
}
type DeleteObjectsRequest struct {
XMLName xml.Name `xml:"Delete"`
Objects []DeleteObjectIdentity `xml:"Object"`
Quiet bool `xml:"Quiet"`
}
type DeleteObjectIdentity struct {
Key string `xml:"Key"`
}
type DeleteObjectsResult struct {
XMLName xml.Name `xml:"DeleteResult"`
Xmlns string `xml:"xmlns,attr"`
Deleted []DeletedEntry `xml:"Deleted,omitempty"`
Errors []DeleteError `xml:"Error,omitempty"`
}
type DeletedEntry struct {
Key string `xml:"Key"`
}
type DeleteError struct {
Key string `xml:"Key"`
Code string `xml:"Code"`
Message string `xml:"Message"`
}
type AuthIdentity struct {
AccessKeyID string `json:"access_key_id"`
SecretEnc string `json:"secret_enc"`
SecretNonce string `json:"secret_nonce"`
EncAlg string `json:"enc_alg"`
KeyVersion string `json:"key_version"`
Status string `json:"status"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type AuthPolicy struct {
Principal string `json:"principal"`
Statements []AuthPolicyStatement `json:"statements"`
}
type AuthPolicyStatement struct {
Effect string `json:"effect"`
Actions []string `json:"actions"`
Bucket string `json:"bucket"`
Prefix string `json:"prefix"`
}

View File

@@ -1,29 +1,80 @@
package service package service
import ( import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt" "fmt"
"fs/metadata" "fs/metadata"
"fs/metrics"
"fs/models" "fs/models"
"fs/storage" "fs/storage"
"io" "io"
"log/slog"
"strings" "strings"
"sync"
"time" "time"
) )
type ObjectService struct { type ObjectService struct {
metadataHandler *metadata.MetadataHandler metadata *metadata.MetadataHandler
blob *storage.BlobStore
multipartRetention time.Duration
gcMu sync.RWMutex
} }
func NewObjectService(metadataHandler *metadata.MetadataHandler) *ObjectService { var (
return &ObjectService{metadataHandler: metadataHandler} ErrInvalidPart = errors.New("invalid multipart part")
ErrInvalidPartOrder = errors.New("invalid multipart part order")
ErrInvalidCompleteRequest = errors.New("invalid complete multipart request")
ErrEntityTooSmall = errors.New("multipart entity too small")
)
func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *storage.BlobStore, multipartRetention time.Duration) *ObjectService {
if multipartRetention <= 0 {
multipartRetention = 24 * time.Hour
}
return &ObjectService{
metadata: metadataHandler,
blob: blobHandler,
multipartRetention: multipartRetention,
}
} }
func (s *ObjectService) PutObject(uri string, contentType string, input io.Reader) (*models.ObjectManifest, error) { func (s *ObjectService) acquireGCRLock() func() {
waitStart := time.Now()
s.gcMu.RLock()
metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart))
holdStart := time.Now()
return func() {
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
s.gcMu.RUnlock()
}
}
bucket := strings.Split(uri, "/")[0] func (s *ObjectService) acquireGCLock() func() {
key := strings.Join(strings.Split(uri, "/")[1:], "/") waitStart := time.Now()
s.gcMu.Lock()
metrics.Default.ObserveLockWait("gc_mu_write", time.Since(waitStart))
holdStart := time.Now()
return func() {
metrics.Default.ObserveLockHold("gc_mu_write", time.Since(holdStart))
s.gcMu.Unlock()
}
}
chunks, size, etag, err := storage.IngestStream(input) func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("put_object", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
chunks, size, etag, err := s.blob.IngestStream(input)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -38,28 +89,408 @@ func (s *ObjectService) PutObject(uri string, contentType string, input io.Reade
Chunks: chunks, Chunks: chunks,
CreatedAt: timestamp, CreatedAt: timestamp,
} }
fmt.Println(manifest) slog.Debug("object_written_manifest",
if err = s.metadataHandler.PutManifest(manifest); err != nil { "bucket", manifest.Bucket,
"key", manifest.Key,
"size", manifest.Size,
"chunk_count", len(manifest.Chunks),
"etag", manifest.ETag,
)
if err = s.metadata.PutManifest(manifest); err != nil {
return nil, err return nil, err
} }
success = true
return manifest, nil return manifest, nil
} }
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
manifest, err := s.metadataHandler.GetManifest(bucket, key) start := time.Now()
waitStart := time.Now()
s.gcMu.RLock()
metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart))
holdStart := time.Now()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil { if err != nil {
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
s.gcMu.RUnlock()
metrics.Default.ObserveService("get_object", time.Since(start), false)
return nil, nil, err return nil, nil, err
} }
pr, pw := io.Pipe() pr, pw := io.Pipe()
go func() { go func() {
defer pw.Close() streamOK := false
defer func() {
err := storage.AssembleStream(manifest.Chunks, pw) metrics.Default.ObserveService("get_object", time.Since(start), streamOK)
if err != nil { }()
defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
defer s.gcMu.RUnlock()
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
_ = pw.CloseWithError(err)
return return
} }
if err := pw.Close(); err != nil {
return
}
streamOK = true
}() }()
return pr, manifest, nil return pr, manifest, nil
} }
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("head_object", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
return models.ObjectManifest{}, err
}
success = true
return *manifest, nil
}
func (s *ObjectService) DeleteObject(bucket, key string) error {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("delete_object", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
err := s.metadata.DeleteManifest(bucket, key)
success = err == nil
return err
}
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.ListObjects(bucket, prefix)
}
func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("for_each_object_from", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
err := s.metadata.ForEachObjectFrom(bucket, startKey, fn)
success = err == nil
return err
}
func (s *ObjectService) CreateBucket(bucket string) error {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("create_bucket", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
err := s.metadata.CreateBucket(bucket)
success = err == nil
return err
}
func (s *ObjectService) HeadBucket(bucket string) error {
unlock := s.acquireGCRLock()
defer unlock()
_, err := s.metadata.GetBucketManifest(bucket)
return err
}
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.GetBucketManifest(bucket)
}
func (s *ObjectService) DeleteBucket(bucket string) error {
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.DeleteBucket(bucket)
}
func (s *ObjectService) ListBuckets() ([]string, error) {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("list_buckets", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
buckets, err := s.metadata.ListBuckets()
success = err == nil
return buckets, err
}
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.DeleteManifests(bucket, keys)
}
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.CreateMultipartUpload(bucket, key)
}
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("upload_part", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
if partNumber < 1 || partNumber > 10000 {
return "", ErrInvalidPart
}
upload, err := s.metadata.GetMultipartUpload(uploadId)
if err != nil {
return "", err
}
if upload.Bucket != bucket || upload.Key != key {
return "", metadata.ErrMultipartNotFound
}
var uploadedPart models.UploadedPart
chunkIds, totalSize, etag, err := s.blob.IngestStream(input)
if err != nil {
return "", err
}
uploadedPart = models.UploadedPart{
PartNumber: partNumber,
ETag: etag,
Size: totalSize,
Chunks: chunkIds,
CreatedAt: time.Now().Unix(),
}
err = s.metadata.PutMultipartPart(uploadId, uploadedPart)
if err != nil {
return "", err
}
success = true
return etag, nil
}
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
unlock := s.acquireGCRLock()
defer unlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
}
if upload.Bucket != bucket || upload.Key != key {
return nil, metadata.ErrMultipartNotFound
}
return s.metadata.ListMultipartParts(uploadID)
}
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("complete_multipart_upload", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
if len(completed) == 0 {
return nil, ErrInvalidCompleteRequest
}
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return nil, err
}
if upload.Bucket != bucket || upload.Key != key {
return nil, metadata.ErrMultipartNotFound
}
storedParts, err := s.metadata.ListMultipartParts(uploadID)
if err != nil {
return nil, err
}
partsByNumber := make(map[int]models.UploadedPart, len(storedParts))
for _, part := range storedParts {
partsByNumber[part.PartNumber] = part
}
lastPartNumber := 0
orderedParts := make([]models.UploadedPart, 0, len(completed))
chunks := make([]string, 0)
var totalSize int64
for i, part := range completed {
if part.PartNumber <= lastPartNumber {
return nil, ErrInvalidPartOrder
}
lastPartNumber = part.PartNumber
storedPart, ok := partsByNumber[part.PartNumber]
if !ok {
return nil, ErrInvalidPart
}
if normalizeETag(part.ETag) != normalizeETag(storedPart.ETag) {
return nil, ErrInvalidPart
}
if i < len(completed)-1 && storedPart.Size < 5*1024*1024 {
return nil, ErrEntityTooSmall
}
orderedParts = append(orderedParts, storedPart)
chunks = append(chunks, storedPart.Chunks...)
totalSize += storedPart.Size
}
finalETag := buildMultipartETag(orderedParts)
manifest := &models.ObjectManifest{
Bucket: bucket,
Key: key,
Size: totalSize,
ContentType: "application/octet-stream",
ETag: finalETag,
Chunks: chunks,
CreatedAt: time.Now().Unix(),
}
if err := s.metadata.CompleteMultipartUpload(uploadID, manifest); err != nil {
return nil, err
}
success = true
return manifest, nil
}
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
unlock := s.acquireGCRLock()
defer unlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
return err
}
if upload.Bucket != bucket || upload.Key != key {
return metadata.ErrMultipartNotFound
}
return s.metadata.AbortMultipartUpload(uploadID)
}
func normalizeETag(etag string) string {
return strings.Trim(etag, "\"")
}
func buildMultipartETag(parts []models.UploadedPart) string {
hasher := md5.New()
for _, part := range parts {
etagBytes, err := hex.DecodeString(normalizeETag(part.ETag))
if err == nil {
_, _ = hasher.Write(etagBytes)
continue
}
_, _ = hasher.Write([]byte(normalizeETag(part.ETag)))
}
return fmt.Sprintf("%x-%d", hasher.Sum(nil), len(parts))
}
func (s *ObjectService) Close() error {
return s.metadata.Close()
}
func (s *ObjectService) GarbageCollect() error {
start := time.Now()
success := false
deletedChunks := 0
deleteErrors := 0
cleanedUploads := 0
defer func() {
metrics.Default.ObserveGC(time.Since(start), deletedChunks, deleteErrors, cleanedUploads, success)
}()
unlock := s.acquireGCLock()
defer unlock()
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
if err != nil {
return err
}
totalChunks := 0
if err := s.blob.ForEachChunk(func(chunkID string) error {
totalChunks++
if _, found := referencedChunkSet[chunkID]; found {
return nil
}
if err := s.blob.DeleteBlob(chunkID); err != nil {
deleteErrors++
slog.Warn("garbage_collect_delete_failed", "chunk_id", chunkID, "error", err)
return nil
}
deletedChunks++
return nil
}); err != nil {
return err
}
cleanedUploads, err = s.metadata.CleanupMultipartUploads(s.multipartRetention)
if err != nil {
return err
}
slog.Info("garbage_collect_completed",
"referenced_chunks", len(referencedChunkSet),
"total_chunks", totalChunks,
"deleted_chunks", deletedChunks,
"delete_errors", deleteErrors,
"cleaned_uploads", cleanedUploads,
)
success = true
return nil
}
func (s *ObjectService) RunGC(ctx context.Context, interval time.Duration) {
if interval <= 0 {
slog.Warn("garbage_collect_disabled_invalid_interval", "interval", interval.String())
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = s.GarbageCollect()
}
}
}

View File

@@ -4,24 +4,55 @@ import (
"crypto/md5" "crypto/md5"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors"
"fmt"
"fs/metrics"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"time"
) )
const chunkSize = 64 * 1024 const blobRoot = "blobs"
const blobRoot = "blobs/" const maxChunkSize = 64 * 1024 * 1024
func IngestStream(stream io.Reader) ([]string, int64, string, error) { type BlobStore struct {
dataRoot string
chunkSize int
}
func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
root = strings.TrimSpace(root)
if root == "" {
return nil, errors.New("blob root is required")
}
if chunkSize <= 0 || chunkSize > maxChunkSize {
return nil, fmt.Errorf("chunk size must be between 1 and %d bytes", maxChunkSize)
}
cleanRoot := filepath.Clean(root)
if err := os.MkdirAll(filepath.Join(cleanRoot, blobRoot), 0o755); err != nil {
return nil, err
}
return &BlobStore{chunkSize: chunkSize, dataRoot: cleanRoot}, nil
}
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
start := time.Now()
fullFileHasher := md5.New() fullFileHasher := md5.New()
buffer := make([]byte, chunkSize) buffer := make([]byte, bs.chunkSize)
var totalSize int64 var totalSize int64
var chunkIDs []string var chunkIDs []string
success := false
defer func() {
metrics.Default.ObserveBlob("ingest_stream", time.Since(start), 0, success)
}()
for { for {
bytesRead, err := io.ReadFull(stream, buffer) bytesRead, err := io.ReadFull(stream, buffer)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, 0, "", err return nil, 0, "", err
} }
@@ -34,13 +65,13 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
chunkHash := sha256.Sum256(chunkData) chunkHash := sha256.Sum256(chunkData)
chunkID := hex.EncodeToString(chunkHash[:]) chunkID := hex.EncodeToString(chunkHash[:])
err := saveBlob(chunkID, chunkData) err := bs.saveBlob(chunkID, chunkData)
if err != nil { if err != nil {
return nil, 0, "", err return nil, 0, "", err
} }
chunkIDs = append(chunkIDs, chunkID) chunkIDs = append(chunkIDs, chunkID)
} }
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
break break
} }
if err != nil { if err != nil {
@@ -50,27 +81,84 @@ func IngestStream(stream io.Reader) ([]string, int64, string, error) {
} }
etag := hex.EncodeToString(fullFileHasher.Sum(nil)) etag := hex.EncodeToString(fullFileHasher.Sum(nil))
success = true
return chunkIDs, totalSize, etag, nil return chunkIDs, totalSize, etag, nil
} }
func saveBlob(chunkID string, data []byte) error { func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
dir := filepath.Join(blobRoot, chunkID[:2], chunkID[2:4]) start := time.Now()
success := false
writtenBytes := int64(0)
defer func() {
metrics.Default.ObserveBlob("write_chunk", time.Since(start), writtenBytes, success)
}()
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
dir := filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4])
if err := os.MkdirAll(dir, 0755); err != nil { if err := os.MkdirAll(dir, 0755); err != nil {
return err return err
} }
fullPath := filepath.Join(dir, chunkID) fullPath := filepath.Join(dir, chunkID)
if _, err := os.Stat(fullPath); os.IsNotExist(err) { if _, err := os.Stat(fullPath); err == nil {
if err := os.WriteFile(fullPath, data, 0644); err != nil { success = true
return err return nil
} } else if !os.IsNotExist(err) {
return err
} }
tmpFile, err := os.CreateTemp(dir, chunkID+".tmp-*")
if err != nil {
return err
}
tmpPath := tmpFile.Name()
cleanup := true
defer func() {
if cleanup {
_ = os.Remove(tmpPath)
}
}()
if _, err := tmpFile.Write(data); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Sync(); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Close(); err != nil {
return err
}
if err := os.Rename(tmpPath, fullPath); err != nil {
if _, statErr := os.Stat(fullPath); statErr == nil {
success = true
return nil
}
return err
}
cleanup = false
if err := syncDir(dir); err != nil {
return err
}
writtenBytes = int64(len(data))
success = true
return nil return nil
} }
func AssembleStream(chunkIDs []string, w *io.PipeWriter) error { func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveBlob("assemble_stream", time.Since(start), 0, success)
}()
for _, chunkID := range chunkIDs { for _, chunkID := range chunkIDs {
chunkData, err := GetBlob(chunkID) chunkData, err := bs.GetBlob(chunkID)
if err != nil { if err != nil {
return err return err
} }
@@ -78,10 +166,86 @@ func AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
return err return err
} }
} }
success = true
return nil return nil
} }
func GetBlob(chunkID string) ([]byte, error) { func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
start := time.Now()
success := false
var size int64
defer func() {
metrics.Default.ObserveBlob("read_chunk", time.Since(start), size, success)
}()
return os.ReadFile(filepath.Join(blobRoot, chunkID[:2], chunkID[2:4], chunkID)) if !isValidChunkID(chunkID) {
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
}
data, err := os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
if err != nil {
return nil, err
}
size = int64(len(data))
success = true
return data, nil
}
func (bs *BlobStore) DeleteBlob(chunkID string) error {
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
err := os.Remove(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
if err != nil && os.IsNotExist(err) {
return nil
}
return err
}
func (bs *BlobStore) ListChunks() ([]string, error) {
var chunkIDs []string
err := bs.ForEachChunk(func(chunkID string) error {
chunkIDs = append(chunkIDs, chunkID)
return nil
})
return chunkIDs, err
}
func (bs *BlobStore) ForEachChunk(fn func(chunkID string) error) error {
if fn == nil {
return errors.New("chunk callback is required")
}
return filepath.Walk(filepath.Join(bs.dataRoot, blobRoot), func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
chunkID := info.Name()
if isValidChunkID(chunkID) {
return fn(chunkID)
}
}
return nil
})
}
func isValidChunkID(chunkID string) bool {
if len(chunkID) != sha256.Size*2 {
return false
}
for _, ch := range chunkID {
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') {
return false
}
}
return true
}
func syncDir(dirPath string) error {
dir, err := os.Open(dirPath)
if err != nil {
return err
}
defer dir.Close()
return dir.Sync()
} }

114
utils/config.go Normal file
View File

@@ -0,0 +1,114 @@
package utils
import (
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/joho/godotenv"
)
type Config struct {
DataPath string
Address string
Port int
ChunkSize int
LogLevel string
LogFormat string
AuditLog bool
GcInterval time.Duration
GcEnabled bool
MultipartCleanupRetention time.Duration
AuthEnabled bool
AuthRegion string
AuthSkew time.Duration
AuthMaxPresign time.Duration
AuthMasterKey string
AuthBootstrapAccessKey string
AuthBootstrapSecretKey string
AuthBootstrapPolicy string
}
func NewConfig() *Config {
_ = godotenv.Load()
config := &Config{
DataPath: sanitizeDataPath(os.Getenv("DATA_PATH")),
Address: firstNonEmpty(strings.TrimSpace(os.Getenv("ADDRESS")), "0.0.0.0"),
Port: envIntRange("PORT", 3000, 1, 65535),
ChunkSize: envIntRange("CHUNK_SIZE", 8192000, 1, 64*1024*1024),
LogLevel: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_LEVEL")), "info")),
LogFormat: strings.ToLower(firstNonEmpty(strings.TrimSpace(os.Getenv("LOG_FORMAT")), strings.TrimSpace(os.Getenv("LOG_TYPE")), "text")),
AuditLog: envBool("AUDIT_LOG", true),
GcInterval: time.Duration(envIntRange("GC_INTERVAL", 10, 1, 60)) * time.Minute,
GcEnabled: envBool("GC_ENABLED", true),
MultipartCleanupRetention: time.Duration(
envIntRange("MULTIPART_RETENTION_HOURS", 24, 1, 24*30),
) * time.Hour,
AuthEnabled: envBool("AUTH_ENABLED", true),
AuthRegion: firstNonEmpty(strings.TrimSpace(os.Getenv("AUTH_REGION")), "us-east-1"),
AuthSkew: time.Duration(envIntRange("AUTH_SKEW_SECONDS", 300, 30, 3600)) * time.Second,
AuthMaxPresign: time.Duration(envIntRange("AUTH_MAX_PRESIGN_SECONDS", 86400, 60, 86400)) * time.Second,
AuthMasterKey: strings.TrimSpace(os.Getenv("AUTH_MASTER_KEY")),
AuthBootstrapAccessKey: strings.TrimSpace(os.Getenv("AUTH_BOOTSTRAP_ACCESS_KEY")),
AuthBootstrapSecretKey: strings.TrimSpace(os.Getenv("AUTH_BOOTSTRAP_SECRET_KEY")),
AuthBootstrapPolicy: strings.TrimSpace(os.Getenv("AUTH_BOOTSTRAP_POLICY")),
}
if config.LogFormat != "json" && config.LogFormat != "text" {
config.LogFormat = "text"
}
return config
}
func envIntRange(key string, defaultValue, minValue, maxValue int) int {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.Atoi(raw)
if err != nil {
return defaultValue
}
if value < minValue || value > maxValue {
return defaultValue
}
return value
}
func envBool(key string, defaultValue bool) bool {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return defaultValue
}
value, err := strconv.ParseBool(raw)
if err != nil {
return defaultValue
}
return value
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if v != "" {
return v
}
}
return ""
}
func sanitizeDataPath(raw string) string {
cleaned := strings.TrimSpace(raw)
if cleaned == "" {
cleaned = "."
}
cleaned = filepath.Clean(cleaned)
if abs, err := filepath.Abs(cleaned); err == nil {
return abs
}
return cleaned
}

54
utils/utils.go Normal file
View File

@@ -0,0 +1,54 @@
package utils
import (
"encoding/xml"
"fs/models"
"sort"
"strings"
"time"
)
func ConstructXMLResponseForObjectList(bucket string, objects []*models.ObjectManifest) (string, error) {
result := models.ListBucketResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucket,
Prefix: "",
KeyCount: len(objects),
MaxKeys: 1000,
IsTruncated: false,
}
prefixSet := make(map[string]struct{})
for _, object := range objects {
result.Contents = append(result.Contents, models.Contents{
Key: object.Key,
LastModified: time.Unix(object.CreatedAt, 0).UTC().Format("2006-01-02T15:04:05.000Z"),
ETag: "\"" + object.ETag + "\"",
Size: object.Size,
StorageClass: "STANDARD",
})
if strings.Contains(object.Key, "/") {
parts := strings.SplitN(object.Key, "/", 2)
prefixSet[parts[0]+"/"] = struct{}{}
}
}
prefixes := make([]string, 0, len(prefixSet))
for prefix := range prefixSet {
prefixes = append(prefixes, prefix)
}
sort.Strings(prefixes)
for _, prefix := range prefixes {
result.CommonPrefixes = append(result.CommonPrefixes, models.CommonPrefixes{Prefix: prefix})
}
output, err := xml.MarshalIndent(result, "", " ")
if err != nil {
return "", err
}
return xml.Header + string(output), nil
}