Merge pull request #6 from ferdzo/feature/metrics

Metrics endpoint
This commit is contained in:
2026-03-02 23:28:05 +01:00
committed by GitHub
11 changed files with 1218 additions and 77 deletions

View File

@@ -45,6 +45,8 @@ Reference: `auth/README.md`
Health:
- `GET /healthz`
- `HEAD /healthz`
- `GET /metrics` (Prometheus exposition format)
- `HEAD /metrics`
## Limitations

View File

@@ -10,6 +10,7 @@ import (
"fs/auth"
"fs/logging"
"fs/metadata"
"fs/metrics"
"fs/models"
"fs/service"
"io"
@@ -70,6 +71,8 @@ func (h *Handler) setupRoutes() {
h.router.Get("/healthz", h.handleHealth)
h.router.Head("/healthz", h.handleHealth)
h.router.Get("/metrics", h.handleMetrics)
h.router.Head("/metrics", h.handleMetrics)
h.router.Get("/", h.handleGetBuckets)
h.router.Get("/{bucket}/", h.handleGetBucket)
@@ -106,6 +109,18 @@ func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) {
}
}
func (h *Handler) handleMetrics(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
payload := metrics.Default.RenderPrometheus()
w.Header().Set("Content-Length", strconv.Itoa(len(payload)))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(payload))
}
func validateObjectKey(key string) *s3APIError {
if key == "" {
err := s3ErrInvalidObjectKey
@@ -222,6 +237,7 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
return
}
metrics.Default.ObserveBatchSize(len(req.Parts))
manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts)
if err != nil {
@@ -297,6 +313,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
return
}
metrics.Default.ObserveBatchSize(1)
if ifNoneMatch := strings.TrimSpace(r.Header.Get("If-None-Match")); ifNoneMatch != "" {
manifest, err := h.svc.HeadObject(bucket, key)
@@ -509,6 +526,7 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
writeS3Error(w, r, s3ErrTooManyDeleteObjects, r.URL.Path)
return
}
metrics.Default.ObserveBatchSize(len(req.Objects))
keys := make([]string, 0, len(req.Objects))
response := models.DeleteObjectsResult{
@@ -627,6 +645,7 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener {
if maxConns <= 0 {
return inner
}
metrics.Default.SetConnectionPoolMax(maxConns)
return &limitedListener{
Listener: inner,
slots: make(chan struct{}, maxConns),
@@ -634,15 +653,26 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener {
}
func (l *limitedListener) Accept() (net.Conn, error) {
l.slots <- struct{}{}
select {
case l.slots <- struct{}{}:
default:
metrics.Default.IncConnectionPoolWait()
metrics.Default.IncRequestQueueLength()
l.slots <- struct{}{}
metrics.Default.DecRequestQueueLength()
}
conn, err := l.Listener.Accept()
if err != nil {
<-l.slots
return nil, err
}
metrics.Default.IncConnectionPoolActive()
return &limitedConn{
Conn: conn,
done: func() { <-l.slots },
done: func() {
<-l.slots
metrics.Default.DecConnectionPoolActive()
},
}, nil
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fs/auth"
"fs/metadata"
"fs/metrics"
"fs/models"
"fs/service"
"net/http"
@@ -200,12 +201,19 @@ func mapToS3Error(err error) s3APIError {
func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) {
requestID := ""
op := "other"
if r != nil {
requestID = middleware.GetReqID(r.Context())
isDeletePost := false
if r.Method == http.MethodPost {
_, isDeletePost = r.URL.Query()["delete"]
}
op = metrics.NormalizeHTTPOperation(r.Method, isDeletePost)
if requestID != "" {
w.Header().Set("x-amz-request-id", requestID)
}
}
metrics.Default.ObserveError(op, apiErr.Code)
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.WriteHeader(apiErr.Status)

View File

@@ -1,6 +1,8 @@
package auth
import (
"errors"
"fs/metrics"
"log/slog"
"net"
"net/http"
@@ -18,17 +20,20 @@ func Middleware(
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authCtx := RequestContext{Authenticated: false, AuthType: "none"}
if svc == nil || !svc.Config().Enabled {
metrics.Default.ObserveAuth("bypass", "disabled", "auth_disabled")
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
return
}
if r.URL.Path == "/healthz" {
metrics.Default.ObserveAuth("bypass", "none", "public_endpoint")
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
return
}
resolvedCtx, err := svc.AuthenticateRequest(r)
if err != nil {
metrics.Default.ObserveAuth("error", "sigv4", authErrorClass(err))
if auditEnabled && logger != nil {
requestID := middleware.GetReqID(r.Context())
attrs := []any{
@@ -50,6 +55,7 @@ func Middleware(
return
}
metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none")
if auditEnabled && logger != nil {
requestID := middleware.GetReqID(r.Context())
attrs := []any{
@@ -69,6 +75,33 @@ func Middleware(
}
}
func authErrorClass(err error) string {
switch {
case errors.Is(err, ErrInvalidAccessKeyID):
return "invalid_access_key"
case errors.Is(err, ErrSignatureDoesNotMatch):
return "signature_mismatch"
case errors.Is(err, ErrAuthorizationHeaderMalformed):
return "auth_header_malformed"
case errors.Is(err, ErrRequestTimeTooSkewed):
return "time_skew"
case errors.Is(err, ErrExpiredToken):
return "expired_token"
case errors.Is(err, ErrNoAuthCredentials):
return "missing_credentials"
case errors.Is(err, ErrUnsupportedAuthScheme):
return "unsupported_auth_scheme"
case errors.Is(err, ErrInvalidPresign):
return "invalid_presign"
case errors.Is(err, ErrCredentialDisabled):
return "credential_disabled"
case errors.Is(err, ErrAccessDenied):
return "access_denied"
default:
return "other"
}
}
func clientIP(remoteAddr string) string {
host, _, err := net.SplitHostPort(remoteAddr)
if err == nil && host != "" {

View File

@@ -1,6 +1,7 @@
package logging
import (
"fs/metrics"
"log/slog"
"net/http"
"os"
@@ -9,6 +10,7 @@ import (
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
@@ -86,6 +88,11 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
op := metricOperationLabel(r)
metrics.Default.IncHTTPInFlightOp(op)
defer func() {
metrics.Default.DecHTTPInFlightOp(op)
}()
requestID := middleware.GetReqID(r.Context())
if requestID != "" {
ww.Header().Set("x-amz-request-id", requestID)
@@ -93,15 +100,18 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
next.ServeHTTP(ww, r)
if !cfg.Audit && !cfg.DebugMode {
return
}
elapsed := time.Since(start)
status := ww.Status()
if status == 0 {
status = http.StatusOK
}
route := metricRouteLabel(r)
metrics.Default.ObserveHTTPRequestDetailed(r.Method, route, op, status, elapsed, ww.BytesWritten())
if !cfg.Audit && !cfg.DebugMode {
return
}
attrs := []any{
"method", r.Method,
"path", r.URL.Path,
@@ -131,6 +141,46 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
}
}
func metricRouteLabel(r *http.Request) string {
if r == nil || r.URL == nil {
return "/unknown"
}
if routeCtx := chi.RouteContext(r.Context()); routeCtx != nil {
if pattern := strings.TrimSpace(routeCtx.RoutePattern()); pattern != "" {
return pattern
}
}
path := strings.TrimSpace(r.URL.Path)
if path == "" || path == "/" {
return "/"
}
if path == "/healthz" || path == "/metrics" {
return path
}
trimmed := strings.Trim(path, "/")
if trimmed == "" {
return "/"
}
if !strings.Contains(trimmed, "/") {
return "/{bucket}"
}
return "/{bucket}/*"
}
func metricOperationLabel(r *http.Request) string {
if r == nil {
return "other"
}
isDeletePost := false
if r.Method == http.MethodPost && r.URL != nil {
_, isDeletePost = r.URL.Query()["delete"]
}
return metrics.NormalizeHTTPOperation(r.Method, isDeletePost)
}
func envBool(key string, defaultValue bool) bool {
raw := os.Getenv(key)
if raw == "" {

View File

@@ -0,0 +1,30 @@
package logging
import (
"net/http/httptest"
"testing"
)
func TestMetricRouteLabelFallbacks(t *testing.T) {
testCases := []struct {
name string
path string
want string
}{
{name: "root", path: "/", want: "/"},
{name: "health", path: "/healthz", want: "/healthz"},
{name: "metrics", path: "/metrics", want: "/metrics"},
{name: "bucket", path: "/some-bucket", want: "/{bucket}"},
{name: "object", path: "/some-bucket/private/path/file.jpg", want: "/{bucket}/*"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := httptest.NewRequest("GET", tc.path, nil)
got := metricRouteLabel(req)
if got != tc.want {
t.Fatalf("metricRouteLabel(%q) = %q, want %q", tc.path, got, tc.want)
}
})
}
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"fs/metrics"
"fs/models"
"net"
"regexp"
@@ -47,7 +48,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
}
h := &MetadataHandler{db: db}
err = h.db.Update(func(tx *bbolt.Tx) error {
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(systemIndex)
return err
})
@@ -55,7 +56,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
return err
})
@@ -63,7 +64,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
return err
})
@@ -71,7 +72,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(authIdentitiesIndex)
return err
})
@@ -79,7 +80,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
_ = db.Close()
return nil, err
}
err = h.db.Update(func(tx *bbolt.Tx) error {
err = h.update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(authPoliciesIndex)
return err
})
@@ -119,6 +120,20 @@ func (h *MetadataHandler) Close() error {
return h.db.Close()
}
func (h *MetadataHandler) view(fn func(tx *bbolt.Tx) error) error {
start := time.Now()
err := h.db.View(fn)
metrics.Default.ObserveMetadataTx("view", time.Since(start), err == nil)
return err
}
func (h *MetadataHandler) update(fn func(tx *bbolt.Tx) error) error {
start := time.Now()
err := h.db.Update(fn)
metrics.Default.ObserveMetadataTx("update", time.Since(start), err == nil)
return err
}
func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error {
if identity == nil {
return errors.New("auth identity is required")
@@ -126,7 +141,7 @@ func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error {
if strings.TrimSpace(identity.AccessKeyID) == "" {
return errors.New("access key id is required")
}
return h.db.Update(func(tx *bbolt.Tx) error {
return h.update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authIdentitiesIndex)
if bucket == nil {
return errors.New("auth identities index not found")
@@ -146,7 +161,7 @@ func (h *MetadataHandler) GetAuthIdentity(accessKeyID string) (*models.AuthIdent
}
var identity *models.AuthIdentity
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authIdentitiesIndex)
if bucket == nil {
return errors.New("auth identities index not found")
@@ -177,7 +192,7 @@ func (h *MetadataHandler) PutAuthPolicy(policy *models.AuthPolicy) error {
return errors.New("auth policy principal is required")
}
policy.Principal = principal
return h.db.Update(func(tx *bbolt.Tx) error {
return h.update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authPoliciesIndex)
if bucket == nil {
return errors.New("auth policies index not found")
@@ -197,7 +212,7 @@ func (h *MetadataHandler) GetAuthPolicy(accessKeyID string) (*models.AuthPolicy,
}
var policy *models.AuthPolicy
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(authPoliciesIndex)
if bucket == nil {
return errors.New("auth policies index not found")
@@ -224,7 +239,7 @@ func (h *MetadataHandler) CreateBucket(bucketName string) error {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
if err != nil {
return err
@@ -256,7 +271,7 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
if err != nil {
return err
@@ -303,7 +318,7 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
func (h *MetadataHandler) ListBuckets() ([]string, error) {
buckets := []string{}
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
@@ -323,7 +338,7 @@ func (h *MetadataHandler) ListBuckets() ([]string, error) {
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
var manifest *models.BucketManifest
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
@@ -353,7 +368,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
return err
}
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
data, err := json.Marshal(manifest)
if err != nil {
return err
@@ -373,7 +388,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
var manifest *models.ObjectManifest
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
@@ -400,7 +415,7 @@ func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectMa
var objects []*models.ObjectManifest
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
@@ -440,7 +455,7 @@ func (h *MetadataHandler) ForEachObjectFrom(bucket, startKey string, fn func(*mo
return errors.New("object callback is required")
}
return h.db.View(func(tx *bbolt.Tx) error {
return h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
@@ -480,7 +495,7 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
return err
}
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
@@ -497,7 +512,7 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
deleted := make([]string, 0, len(keys))
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
metadataBucket := tx.Bucket([]byte(bucket))
if metadataBucket == nil {
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
@@ -525,7 +540,7 @@ func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]strin
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")
@@ -548,7 +563,7 @@ func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.Mul
State: "pending",
}
err = h.db.Update(func(tx *bbolt.Tx) error {
err = h.update(func(tx *bbolt.Tx) error {
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
if multipartUploadBucket == nil {
return errors.New("multipart upload index not found")
@@ -643,7 +658,7 @@ func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
var upload *models.MultipartUpload
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
var err error
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
if err != nil {
@@ -661,7 +676,7 @@ func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.Uploaded
return fmt.Errorf("invalid part number: %d", part.PartNumber)
}
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
@@ -690,7 +705,7 @@ func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.Uploaded
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
parts := make([]models.UploadedPart, 0)
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
return err
}
@@ -724,7 +739,7 @@ func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models
return errors.New("final object manifest is required")
}
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
@@ -763,7 +778,7 @@ func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models
return nil
}
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
if err != nil {
return err
@@ -793,7 +808,7 @@ func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int,
}
cleaned := 0
err := h.db.Update(func(tx *bbolt.Tx) error {
err := h.update(func(tx *bbolt.Tx) error {
uploadsBucket, err := getMultipartUploadBucket(tx)
if err != nil {
return err
@@ -843,7 +858,7 @@ func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
chunkSet := make(map[string]struct{})
pendingUploadSet := make(map[string]struct{})
err := h.db.View(func(tx *bbolt.Tx) error {
err := h.view(func(tx *bbolt.Tx) error {
systemIndexBucket := tx.Bucket([]byte(systemIndex))
if systemIndexBucket == nil {
return errors.New("system index not found")

795
metrics/metrics.go Normal file
View File

@@ -0,0 +1,795 @@
package metrics
import (
"fmt"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
var defaultBuckets = []float64{
0.0005, 0.001, 0.0025, 0.005, 0.01,
0.025, 0.05, 0.1, 0.25, 0.5,
1, 2.5, 5, 10,
}
var lockBuckets = []float64{
0.000001, 0.000005, 0.00001, 0.00005,
0.0001, 0.0005, 0.001, 0.005, 0.01,
0.025, 0.05, 0.1, 0.25, 0.5, 1,
}
var batchBuckets = []float64{1, 2, 4, 8, 16, 32, 64, 100, 128, 256, 512, 1000, 5000}
var Default = NewRegistry()
type histogram struct {
bounds []float64
counts []uint64
sum float64
count uint64
}
func newHistogram(bounds []float64) *histogram {
cloned := make([]float64, len(bounds))
copy(cloned, bounds)
return &histogram{
bounds: cloned,
counts: make([]uint64, len(bounds)+1),
}
}
func (h *histogram) observe(v float64) {
h.count++
h.sum += v
for i, bound := range h.bounds {
if v <= bound {
h.counts[i]++
return
}
}
h.counts[len(h.counts)-1]++
}
func (h *histogram) snapshot() (bounds []float64, counts []uint64, sum float64, count uint64) {
bounds = make([]float64, len(h.bounds))
copy(bounds, h.bounds)
counts = make([]uint64, len(h.counts))
copy(counts, h.counts)
return bounds, counts, h.sum, h.count
}
type Registry struct {
startedAt time.Time
httpInFlight atomic.Int64
connectionPoolActive atomic.Int64
connectionPoolMax atomic.Int64
connectionPoolWaits atomic.Uint64
requestQueueLength atomic.Int64
mu sync.Mutex
httpRequestsRoute map[string]uint64
httpResponseBytesRoute map[string]uint64
httpDurationRoute map[string]*histogram
httpRequestsOp map[string]uint64
httpDurationOp map[string]*histogram
httpInFlightOp map[string]int64
authRequests map[string]uint64
serviceOps map[string]uint64
serviceDuration map[string]*histogram
dbTxTotal map[string]uint64
dbTxDuration map[string]*histogram
blobOps map[string]uint64
blobBytes map[string]uint64
blobDuration map[string]*histogram
lockWait map[string]*histogram
lockHold map[string]*histogram
cacheHits map[string]uint64
cacheMisses map[string]uint64
batchSize *histogram
retries map[string]uint64
errors map[string]uint64
gcRuns map[string]uint64
gcDuration *histogram
gcDeletedChunks uint64
gcDeleteErrors uint64
gcCleanedUpload uint64
}
func NewRegistry() *Registry {
return &Registry{
startedAt: time.Now(),
httpRequestsRoute: make(map[string]uint64),
httpResponseBytesRoute: make(map[string]uint64),
httpDurationRoute: make(map[string]*histogram),
httpRequestsOp: make(map[string]uint64),
httpDurationOp: make(map[string]*histogram),
httpInFlightOp: make(map[string]int64),
authRequests: make(map[string]uint64),
serviceOps: make(map[string]uint64),
serviceDuration: make(map[string]*histogram),
dbTxTotal: make(map[string]uint64),
dbTxDuration: make(map[string]*histogram),
blobOps: make(map[string]uint64),
blobBytes: make(map[string]uint64),
blobDuration: make(map[string]*histogram),
lockWait: make(map[string]*histogram),
lockHold: make(map[string]*histogram),
cacheHits: make(map[string]uint64),
cacheMisses: make(map[string]uint64),
batchSize: newHistogram(batchBuckets),
retries: make(map[string]uint64),
errors: make(map[string]uint64),
gcRuns: make(map[string]uint64),
gcDuration: newHistogram(defaultBuckets),
}
}
func NormalizeHTTPOperation(method string, isDeletePost bool) string {
switch strings.ToUpper(strings.TrimSpace(method)) {
case "GET":
return "get"
case "PUT":
return "put"
case "DELETE":
return "delete"
case "HEAD":
return "head"
case "POST":
if isDeletePost {
return "delete"
}
return "put"
default:
return "other"
}
}
func statusResult(status int) string {
if status >= 200 && status < 400 {
return "ok"
}
return "error"
}
func normalizeRoute(route string) string {
route = strings.TrimSpace(route)
if route == "" {
return "/unknown"
}
return route
}
func normalizeOp(op string) string {
op = strings.ToLower(strings.TrimSpace(op))
if op == "" {
return "other"
}
return op
}
func (r *Registry) IncHTTPInFlight() {
r.httpInFlight.Add(1)
}
func (r *Registry) DecHTTPInFlight() {
r.httpInFlight.Add(-1)
}
func (r *Registry) IncHTTPInFlightOp(op string) {
r.httpInFlight.Add(1)
op = normalizeOp(op)
r.mu.Lock()
r.httpInFlightOp[op]++
r.mu.Unlock()
}
func (r *Registry) DecHTTPInFlightOp(op string) {
r.httpInFlight.Add(-1)
op = normalizeOp(op)
r.mu.Lock()
r.httpInFlightOp[op]--
if r.httpInFlightOp[op] < 0 {
r.httpInFlightOp[op] = 0
}
r.mu.Unlock()
}
func (r *Registry) ObserveHTTPRequest(method, route string, status int, d time.Duration, responseBytes int) {
op := NormalizeHTTPOperation(method, false)
r.ObserveHTTPRequestDetailed(method, route, op, status, d, responseBytes)
}
func (r *Registry) ObserveHTTPRequestDetailed(method, route, op string, status int, d time.Duration, responseBytes int) {
route = normalizeRoute(route)
op = normalizeOp(op)
result := statusResult(status)
routeKey := method + "|" + route + "|" + strconv.Itoa(status)
routeDurKey := method + "|" + route
opKey := op + "|" + result
r.mu.Lock()
r.httpRequestsRoute[routeKey]++
if responseBytes > 0 {
r.httpResponseBytesRoute[routeKey] += uint64(responseBytes)
}
hRoute := r.httpDurationRoute[routeDurKey]
if hRoute == nil {
hRoute = newHistogram(defaultBuckets)
r.httpDurationRoute[routeDurKey] = hRoute
}
hRoute.observe(d.Seconds())
r.httpRequestsOp[opKey]++
hOp := r.httpDurationOp[opKey]
if hOp == nil {
hOp = newHistogram(defaultBuckets)
r.httpDurationOp[opKey] = hOp
}
hOp.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveAuth(result, authType, reason string) {
authType = strings.TrimSpace(authType)
if authType == "" {
authType = "unknown"
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "none"
}
key := result + "|" + authType + "|" + reason
r.mu.Lock()
r.authRequests[key]++
r.mu.Unlock()
}
func (r *Registry) ObserveService(operation string, d time.Duration, ok bool) {
result := "error"
if ok {
result = "ok"
}
key := operation + "|" + result
r.mu.Lock()
r.serviceOps[key]++
h := r.serviceDuration[operation]
if h == nil {
h = newHistogram(defaultBuckets)
r.serviceDuration[operation] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveMetadataTx(txType string, d time.Duration, ok bool) {
result := "error"
if ok {
result = "ok"
}
key := txType + "|" + result
r.mu.Lock()
r.dbTxTotal[key]++
h := r.dbTxDuration[txType]
if h == nil {
h = newHistogram(defaultBuckets)
r.dbTxDuration[txType] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool, backend ...string) {
be := "disk"
if len(backend) > 0 {
candidate := strings.TrimSpace(backend[0])
if candidate != "" {
be = strings.ToLower(candidate)
}
}
result := "error"
if ok {
result = "ok"
}
op := strings.ToLower(strings.TrimSpace(operation))
if op == "" {
op = "unknown"
}
histKey := op + "|" + be + "|" + result
opsKey := histKey
r.mu.Lock()
r.blobOps[opsKey]++
h := r.blobDuration[histKey]
if h == nil {
h = newHistogram(defaultBuckets)
r.blobDuration[histKey] = h
}
h.observe(d.Seconds())
if bytes > 0 {
r.blobBytes[op] += uint64(bytes)
}
r.mu.Unlock()
}
func (r *Registry) SetConnectionPoolMax(max int) {
if max < 0 {
max = 0
}
r.connectionPoolMax.Store(int64(max))
}
func (r *Registry) IncConnectionPoolActive() {
r.connectionPoolActive.Add(1)
}
func (r *Registry) DecConnectionPoolActive() {
r.connectionPoolActive.Add(-1)
}
func (r *Registry) IncConnectionPoolWait() {
r.connectionPoolWaits.Add(1)
}
func (r *Registry) IncRequestQueueLength() {
r.requestQueueLength.Add(1)
}
func (r *Registry) DecRequestQueueLength() {
r.requestQueueLength.Add(-1)
}
func (r *Registry) ObserveLockWait(lockName string, d time.Duration) {
lockName = strings.TrimSpace(lockName)
if lockName == "" {
lockName = "unknown"
}
r.mu.Lock()
h := r.lockWait[lockName]
if h == nil {
h = newHistogram(lockBuckets)
r.lockWait[lockName] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveLockHold(lockName string, d time.Duration) {
lockName = strings.TrimSpace(lockName)
if lockName == "" {
lockName = "unknown"
}
r.mu.Lock()
h := r.lockHold[lockName]
if h == nil {
h = newHistogram(lockBuckets)
r.lockHold[lockName] = h
}
h.observe(d.Seconds())
r.mu.Unlock()
}
func (r *Registry) ObserveCacheHit(cache string) {
cache = strings.TrimSpace(cache)
if cache == "" {
cache = "unknown"
}
r.mu.Lock()
r.cacheHits[cache]++
r.mu.Unlock()
}
func (r *Registry) ObserveCacheMiss(cache string) {
cache = strings.TrimSpace(cache)
if cache == "" {
cache = "unknown"
}
r.mu.Lock()
r.cacheMisses[cache]++
r.mu.Unlock()
}
func (r *Registry) ObserveBatchSize(size int) {
if size < 0 {
size = 0
}
r.mu.Lock()
r.batchSize.observe(float64(size))
r.mu.Unlock()
}
func (r *Registry) ObserveRetry(op, reason string) {
op = normalizeOp(op)
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "unknown"
}
key := op + "|" + reason
r.mu.Lock()
r.retries[key]++
r.mu.Unlock()
}
func (r *Registry) ObserveError(op, reason string) {
op = normalizeOp(op)
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "unknown"
}
key := op + "|" + reason
r.mu.Lock()
r.errors[key]++
r.mu.Unlock()
}
func (r *Registry) ObserveGC(d time.Duration, deletedChunks, deleteErrors, cleanedUploads int, ok bool) {
result := "error"
if ok {
result = "ok"
}
r.mu.Lock()
r.gcRuns[result]++
r.gcDuration.observe(d.Seconds())
if deletedChunks > 0 {
r.gcDeletedChunks += uint64(deletedChunks)
}
if deleteErrors > 0 {
r.gcDeleteErrors += uint64(deleteErrors)
}
if cleanedUploads > 0 {
r.gcCleanedUpload += uint64(cleanedUploads)
}
r.mu.Unlock()
}
func (r *Registry) RenderPrometheus() string {
now := time.Now()
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
r.mu.Lock()
httpReqRoute := copyCounterMap(r.httpRequestsRoute)
httpRespRoute := copyCounterMap(r.httpResponseBytesRoute)
httpDurRoute := copyHistogramMap(r.httpDurationRoute)
httpReqOp := copyCounterMap(r.httpRequestsOp)
httpDurOp := copyHistogramMap(r.httpDurationOp)
httpInFlightOp := copyIntGaugeMap(r.httpInFlightOp)
authReq := copyCounterMap(r.authRequests)
serviceOps := copyCounterMap(r.serviceOps)
serviceDur := copyHistogramMap(r.serviceDuration)
dbTx := copyCounterMap(r.dbTxTotal)
dbTxDur := copyHistogramMap(r.dbTxDuration)
blobOps := copyCounterMap(r.blobOps)
blobBytes := copyCounterMap(r.blobBytes)
blobDur := copyHistogramMap(r.blobDuration)
lockWait := copyHistogramMap(r.lockWait)
lockHold := copyHistogramMap(r.lockHold)
cacheHits := copyCounterMap(r.cacheHits)
cacheMisses := copyCounterMap(r.cacheMisses)
batchBounds, batchCounts, batchSum, batchCount := r.batchSize.snapshot()
retries := copyCounterMap(r.retries)
errorsTotal := copyCounterMap(r.errors)
gcRuns := copyCounterMap(r.gcRuns)
gcDurBounds, gcDurCounts, gcDurSum, gcDurCount := r.gcDuration.snapshot()
gcDeletedChunks := r.gcDeletedChunks
gcDeleteErrors := r.gcDeleteErrors
gcCleanedUploads := r.gcCleanedUpload
r.mu.Unlock()
connectionActive := float64(r.connectionPoolActive.Load())
connectionMax := float64(r.connectionPoolMax.Load())
connectionWaits := r.connectionPoolWaits.Load()
queueLength := float64(r.requestQueueLength.Load())
resident, hasResident := readResidentMemoryBytes()
cpuSeconds, hasCPU := readProcessCPUSeconds()
var b strings.Builder
httpInFlightOp["all"] = r.httpInFlight.Load()
writeGaugeVecFromInt64(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests by operation.", httpInFlightOp, "op")
writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests by operation and result.", httpReqOp, []string{"op", "result"})
writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency by operation and result.", httpDurOp, []string{"op", "result"})
writeCounterVecKV(&b, "fs_http_requests_by_route_total", "Total HTTP requests by method/route/status.", httpReqRoute, []string{"method", "route", "status"})
writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpRespRoute, []string{"method", "route", "status"})
writeHistogramVecKV(&b, "fs_http_request_duration_by_route_seconds", "HTTP request latency by method/route.", httpDurRoute, []string{"method", "route"})
writeCounterVecKV(&b, "fs_auth_requests_total", "Authentication attempts by result.", authReq, []string{"result", "auth_type", "reason"})
writeCounterVecKV(&b, "fs_service_operations_total", "Service-level operation calls.", serviceOps, []string{"operation", "result"})
writeHistogramVecKV(&b, "fs_service_operation_duration_seconds", "Service-level operation latency.", serviceDur, []string{"operation"})
writeCounterVecKV(&b, "fs_metadata_tx_total", "Metadata transaction calls.", dbTx, []string{"type", "result"})
writeHistogramVecKV(&b, "fs_metadata_tx_duration_seconds", "Metadata transaction latency.", dbTxDur, []string{"type"})
writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob backend operation latency.", blobDur, []string{"op", "backend", "result"})
writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"op", "backend", "result"})
writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed by operation.", blobBytes, []string{"op"})
writeGauge(&b, "fs_connection_pool_active", "Active pooled connections.", connectionActive)
writeGauge(&b, "fs_connection_pool_max", "Maximum pooled connections.", connectionMax)
writeCounter(&b, "fs_connection_pool_waits_total", "Number of waits due to pool saturation.", connectionWaits)
writeGauge(&b, "fs_request_queue_length", "Requests waiting for an execution slot.", queueLength)
writeHistogramVecKV(&b, "fs_lock_wait_seconds", "Time spent waiting for locks.", lockWait, []string{"lock_name"})
writeHistogramVecKV(&b, "fs_lock_hold_seconds", "Time locks were held.", lockHold, []string{"lock_name"})
writeCounterVecKV(&b, "fs_cache_hits_total", "Cache hits by cache name.", cacheHits, []string{"cache"})
writeCounterVecKV(&b, "fs_cache_misses_total", "Cache misses by cache name.", cacheMisses, []string{"cache"})
writeHistogram(&b, "fs_batch_size_histogram", "Observed batch sizes.", nil, batchBounds, batchCounts, batchSum, batchCount)
writeCounterVecKV(&b, "fs_retries_total", "Retries by operation and reason.", retries, []string{"op", "reason"})
writeCounterVecKV(&b, "fs_errors_total", "Errors by operation and reason.", errorsTotal, []string{"op", "reason"})
writeCounterVecKV(&b, "fs_gc_runs_total", "Garbage collection runs.", gcRuns, []string{"result"})
writeHistogram(&b, "fs_gc_duration_seconds", "Garbage collection runtime.", nil, gcDurBounds, gcDurCounts, gcDurSum, gcDurCount)
writeCounter(&b, "fs_gc_deleted_chunks_total", "Deleted chunks during GC.", gcDeletedChunks)
writeCounter(&b, "fs_gc_delete_errors_total", "Chunk delete errors during GC.", gcDeleteErrors)
writeCounter(&b, "fs_gc_cleaned_uploads_total", "Cleaned multipart uploads during GC.", gcCleanedUploads)
writeGauge(&b, "fs_uptime_seconds", "Process uptime in seconds.", now.Sub(r.startedAt).Seconds())
writeGauge(&b, "fs_runtime_goroutines", "Number of goroutines.", float64(runtime.NumGoroutine()))
writeGaugeVec(&b, "fs_runtime_memory_bytes", "Runtime memory in bytes.", map[string]float64{
"alloc": float64(mem.Alloc),
"total": float64(mem.TotalAlloc),
"sys": float64(mem.Sys),
"heap_alloc": float64(mem.HeapAlloc),
"heap_sys": float64(mem.HeapSys),
"stack_sys": float64(mem.StackSys),
}, "type")
writeCounter(&b, "fs_runtime_gc_cycles_total", "Completed GC cycles.", uint64(mem.NumGC))
writeCounterFloat(&b, "fs_runtime_gc_pause_seconds_total", "Total GC pause time in seconds.", float64(mem.PauseTotalNs)/1e9)
if hasCPU {
writeCounterFloat(&b, "process_cpu_seconds_total", "Total user and system CPU time spent in seconds.", cpuSeconds)
}
if hasResident {
writeGauge(&b, "process_resident_memory_bytes", "Resident memory size in bytes.", resident)
}
return b.String()
}
type histogramSnapshot struct {
bounds []float64
counts []uint64
sum float64
count uint64
}
func copyCounterMap(src map[string]uint64) map[string]uint64 {
out := make(map[string]uint64, len(src))
for k, v := range src {
out[k] = v
}
return out
}
func copyIntGaugeMap(src map[string]int64) map[string]int64 {
out := make(map[string]int64, len(src))
for k, v := range src {
out[k] = v
}
return out
}
func copyHistogramMap(src map[string]*histogram) map[string]histogramSnapshot {
out := make(map[string]histogramSnapshot, len(src))
for k, h := range src {
bounds, counts, sum, count := h.snapshot()
out[k] = histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count}
}
return out
}
func writeCounter(b *strings.Builder, name, help string, value uint64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
fmt.Fprintf(b, "%s %d\n", name, value)
}
func writeCounterFloat(b *strings.Builder, name, help string, value float64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
fmt.Fprintf(b, "%s %.9f\n", name, value)
}
func writeGauge(b *strings.Builder, name, help string, value float64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
fmt.Fprintf(b, "%s %.9f\n", name, value)
}
func writeGaugeVec(b *strings.Builder, name, help string, values map[string]float64, labelName string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), values[key])
}
}
func writeGaugeVecFromInt64(b *strings.Builder, name, help string, values map[string]int64, labelName string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), float64(values[key]))
}
}
func writeCounterVecKV(b *strings.Builder, name, help string, values map[string]uint64, labels []string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
parts := strings.Split(key, "|")
fmt.Fprintf(b, "%s{%s} %d\n", name, formatLabels(labels, parts), values[key])
}
}
func writeHistogramVecKV(b *strings.Builder, name, help string, values map[string]histogramSnapshot, labels []string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
parts := strings.Split(key, "|")
labelsMap := make(map[string]string, len(labels))
for i, label := range labels {
if i < len(parts) {
labelsMap[label] = parts[i]
} else {
labelsMap[label] = ""
}
}
writeHistogramWithLabelsMap(b, name, labelsMap, values[key])
}
}
func writeHistogram(b *strings.Builder, name, help string, labels map[string]string, bounds []float64, counts []uint64, sum float64, count uint64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count})
}
func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[string]string, s histogramSnapshot) {
var cumulative uint64
for i, bucketCount := range s.counts {
cumulative += bucketCount
bucketLabels := cloneLabels(labels)
if i < len(s.bounds) {
bucketLabels["le"] = trimFloat(s.bounds[i])
} else {
bucketLabels["le"] = "+Inf"
}
fmt.Fprintf(b, "%s_bucket{%s} %d\n", name, labelsToString(bucketLabels), cumulative)
}
labelsSuffix := formatLabelsSuffix(labels)
fmt.Fprintf(b, "%s_sum%s %.9f\n", name, labelsSuffix, s.sum)
fmt.Fprintf(b, "%s_count%s %d\n", name, labelsSuffix, s.count)
}
func formatLabelsSuffix(labels map[string]string) string {
if len(labels) == 0 {
return ""
}
return "{" + labelsToString(labels) + "}"
}
func formatLabels(keys, values []string) string {
parts := make([]string, 0, len(keys))
for i, key := range keys {
value := ""
if i < len(values) {
value = values[i]
}
parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(value)))
}
return strings.Join(parts, ",")
}
func labelsToString(labels map[string]string) string {
if len(labels) == 0 {
return ""
}
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, key := range keys {
parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(labels[key])))
}
return strings.Join(parts, ",")
}
func cloneLabels(in map[string]string) map[string]string {
if len(in) == 0 {
return map[string]string{}
}
out := make(map[string]string, len(in)+1)
for k, v := range in {
out[k] = v
}
return out
}
func trimFloat(v float64) string {
return strconv.FormatFloat(v, 'f', -1, 64)
}
func escapeLabelValue(value string) string {
value = strings.ReplaceAll(value, `\`, `\\`)
value = strings.ReplaceAll(value, "\n", `\\n`)
value = strings.ReplaceAll(value, `"`, `\\"`)
return value
}
func readResidentMemoryBytes() (float64, bool) {
data, err := os.ReadFile("/proc/self/statm")
if err != nil {
return 0, false
}
fields := strings.Fields(string(data))
if len(fields) < 2 {
return 0, false
}
rssPages, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil || rssPages < 0 {
return 0, false
}
return float64(rssPages * int64(os.Getpagesize())), true
}
func readProcessCPUSeconds() (float64, bool) {
var usage syscall.Rusage
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil {
return 0, false
}
user := float64(usage.Utime.Sec) + float64(usage.Utime.Usec)/1e6
sys := float64(usage.Stime.Sec) + float64(usage.Stime.Usec)/1e6
return user + sys, true
}

34
metrics/metrics_test.go Normal file
View File

@@ -0,0 +1,34 @@
package metrics
import (
"strings"
"testing"
)
func TestRenderPrometheusHistogramNoEmptyLabelSet(t *testing.T) {
reg := NewRegistry()
reg.ObserveBatchSize(3)
reg.ObserveGC(0, 0, 0, 0, true)
out := reg.RenderPrometheus()
if strings.Contains(out, "fs_batch_size_histogram_sum{}") {
t.Fatalf("unexpected empty label set for batch sum metric")
}
if strings.Contains(out, "fs_batch_size_histogram_count{}") {
t.Fatalf("unexpected empty label set for batch count metric")
}
if strings.Contains(out, "fs_gc_duration_seconds_sum{}") {
t.Fatalf("unexpected empty label set for gc sum metric")
}
if strings.Contains(out, "fs_gc_duration_seconds_count{}") {
t.Fatalf("unexpected empty label set for gc count metric")
}
}
func TestEscapeLabelValueEscapesSingleBackslash(t *testing.T) {
got := escapeLabelValue(`a\b`)
want := `a\\b`
if got != want {
t.Fatalf("escapeLabelValue returned %q, want %q", got, want)
}
}

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"fs/metadata"
"fs/metrics"
"fs/models"
"fs/storage"
"io"
@@ -41,9 +42,37 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st
}
}
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
func (s *ObjectService) acquireGCRLock() func() {
waitStart := time.Now()
s.gcMu.RLock()
defer s.gcMu.RUnlock()
metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart))
holdStart := time.Now()
return func() {
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
s.gcMu.RUnlock()
}
}
func (s *ObjectService) acquireGCLock() func() {
waitStart := time.Now()
s.gcMu.Lock()
metrics.Default.ObserveLockWait("gc_mu_write", time.Since(waitStart))
holdStart := time.Now()
return func() {
metrics.Default.ObserveLockHold("gc_mu_write", time.Since(holdStart))
s.gcMu.Unlock()
}
}
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("put_object", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
chunks, size, etag, err := s.blob.IngestStream(input)
if err != nil {
@@ -71,110 +100,171 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
return nil, err
}
success = true
return manifest, nil
}
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
start := time.Now()
waitStart := time.Now()
s.gcMu.RLock()
metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart))
holdStart := time.Now()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
s.gcMu.RUnlock()
metrics.Default.ObserveService("get_object", time.Since(start), false)
return nil, nil, err
}
pr, pw := io.Pipe()
go func() {
streamOK := false
defer func() {
metrics.Default.ObserveService("get_object", time.Since(start), streamOK)
}()
defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
defer s.gcMu.RUnlock()
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.Close()
if err := pw.Close(); err != nil {
return
}
streamOK = true
}()
return pr, manifest, nil
}
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("head_object", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
manifest, err := s.metadata.GetManifest(bucket, key)
if err != nil {
return models.ObjectManifest{}, err
}
success = true
return *manifest, nil
}
func (s *ObjectService) DeleteObject(bucket, key string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.DeleteManifest(bucket, key)
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("delete_object", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
err := s.metadata.DeleteManifest(bucket, key)
success = err == nil
return err
}
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.ListObjects(bucket, prefix)
}
func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("for_each_object_from", time.Since(start), success)
}()
return s.metadata.ForEachObjectFrom(bucket, startKey, fn)
unlock := s.acquireGCRLock()
defer unlock()
err := s.metadata.ForEachObjectFrom(bucket, startKey, fn)
success = err == nil
return err
}
func (s *ObjectService) CreateBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
return s.metadata.CreateBucket(bucket)
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("create_bucket", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
err := s.metadata.CreateBucket(bucket)
success = err == nil
return err
}
func (s *ObjectService) HeadBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
_, err := s.metadata.GetBucketManifest(bucket)
return err
}
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.GetBucketManifest(bucket)
}
func (s *ObjectService) DeleteBucket(bucket string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.DeleteBucket(bucket)
}
func (s *ObjectService) ListBuckets() ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("list_buckets", time.Since(start), success)
}()
return s.metadata.ListBuckets()
unlock := s.acquireGCRLock()
defer unlock()
buckets, err := s.metadata.ListBuckets()
success = err == nil
return buckets, err
}
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.DeleteManifests(bucket, keys)
}
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
return s.metadata.CreateMultipartUpload(bucket, key)
}
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("upload_part", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
if partNumber < 1 || partNumber > 10000 {
return "", ErrInvalidPart
@@ -204,12 +294,13 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
if err != nil {
return "", err
}
success = true
return etag, nil
}
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
@@ -222,8 +313,14 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode
}
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveService("complete_multipart_upload", time.Since(start), success)
}()
unlock := s.acquireGCRLock()
defer unlock()
if len(completed) == 0 {
return nil, ErrInvalidCompleteRequest
@@ -288,12 +385,13 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
return nil, err
}
success = true
return manifest, nil
}
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
s.gcMu.RLock()
defer s.gcMu.RUnlock()
unlock := s.acquireGCRLock()
defer unlock()
upload, err := s.metadata.GetMultipartUpload(uploadID)
if err != nil {
@@ -327,8 +425,17 @@ func (s *ObjectService) Close() error {
}
func (s *ObjectService) GarbageCollect() error {
s.gcMu.Lock()
defer s.gcMu.Unlock()
start := time.Now()
success := false
deletedChunks := 0
deleteErrors := 0
cleanedUploads := 0
defer func() {
metrics.Default.ObserveGC(time.Since(start), deletedChunks, deleteErrors, cleanedUploads, success)
}()
unlock := s.acquireGCLock()
defer unlock()
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
if err != nil {
@@ -336,9 +443,6 @@ func (s *ObjectService) GarbageCollect() error {
}
totalChunks := 0
deletedChunks := 0
deleteErrors := 0
cleanedUploads := 0
if err := s.blob.ForEachChunk(func(chunkID string) error {
totalChunks++
@@ -368,6 +472,7 @@ func (s *ObjectService) GarbageCollect() error {
"delete_errors", deleteErrors,
"cleaned_uploads", cleanedUploads,
)
success = true
return nil
}

View File

@@ -6,10 +6,12 @@ import (
"encoding/hex"
"errors"
"fmt"
"fs/metrics"
"io"
"os"
"path/filepath"
"strings"
"time"
)
const blobRoot = "blobs"
@@ -37,11 +39,16 @@ func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
}
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
start := time.Now()
fullFileHasher := md5.New()
buffer := make([]byte, bs.chunkSize)
var totalSize int64
var chunkIDs []string
success := false
defer func() {
metrics.Default.ObserveBlob("ingest_stream", time.Since(start), 0, success)
}()
for {
bytesRead, err := io.ReadFull(stream, buffer)
@@ -74,10 +81,18 @@ func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, er
}
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
success = true
return chunkIDs, totalSize, etag, nil
}
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
start := time.Now()
success := false
writtenBytes := int64(0)
defer func() {
metrics.Default.ObserveBlob("write_chunk", time.Since(start), writtenBytes, success)
}()
if !isValidChunkID(chunkID) {
return fmt.Errorf("invalid chunk id: %q", chunkID)
}
@@ -88,6 +103,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
fullPath := filepath.Join(dir, chunkID)
if _, err := os.Stat(fullPath); err == nil {
success = true
return nil
} else if !os.IsNotExist(err) {
return err
@@ -119,6 +135,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
if err := os.Rename(tmpPath, fullPath); err != nil {
if _, statErr := os.Stat(fullPath); statErr == nil {
success = true
return nil
}
return err
@@ -128,10 +145,18 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
if err := syncDir(dir); err != nil {
return err
}
writtenBytes = int64(len(data))
success = true
return nil
}
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
start := time.Now()
success := false
defer func() {
metrics.Default.ObserveBlob("assemble_stream", time.Since(start), 0, success)
}()
for _, chunkID := range chunkIDs {
chunkData, err := bs.GetBlob(chunkID)
if err != nil {
@@ -141,14 +166,28 @@ func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
return err
}
}
success = true
return nil
}
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
start := time.Now()
success := false
var size int64
defer func() {
metrics.Default.ObserveBlob("read_chunk", time.Since(start), size, success)
}()
if !isValidChunkID(chunkID) {
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
}
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
data, err := os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
if err != nil {
return nil, err
}
size = int64(len(data))
success = true
return data, nil
}
func (bs *BlobStore) DeleteBlob(chunkID string) error {