From f04f7601c08d577fca693e69ae7d57bb01a59b63 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Fri, 27 Feb 2026 14:59:23 +0100 Subject: [PATCH 1/4] Initial metrics endpoint added in Prometheus style --- README.md | 2 + api/api.go | 14 + auth/middleware.go | 35 ++- logging/logging.go | 44 ++- logging/logging_metrics_test.go | 30 ++ metadata/metadata.go | 71 +++-- metrics/metrics.go | 471 ++++++++++++++++++++++++++++++++ service/service.go | 89 +++++- storage/blob.go | 39 ++- 9 files changed, 754 insertions(+), 41 deletions(-) create mode 100644 logging/logging_metrics_test.go create mode 100644 metrics/metrics.go diff --git a/README.md b/README.md index f9fee41..1e57191 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ Reference: `auth/README.md` Health: - `GET /healthz` - `HEAD /healthz` +- `GET /metrics` (Prometheus exposition format) +- `HEAD /metrics` ## Limitations diff --git a/api/api.go b/api/api.go index 9b28d21..90d2589 100644 --- a/api/api.go +++ b/api/api.go @@ -10,6 +10,7 @@ import ( "fs/auth" "fs/logging" "fs/metadata" + "fs/metrics" "fs/models" "fs/service" "io" @@ -70,6 +71,8 @@ func (h *Handler) setupRoutes() { h.router.Get("/healthz", h.handleHealth) h.router.Head("/healthz", h.handleHealth) + h.router.Get("/metrics", h.handleMetrics) + h.router.Head("/metrics", h.handleMetrics) h.router.Get("/", h.handleGetBuckets) h.router.Get("/{bucket}/", h.handleGetBucket) @@ -106,6 +109,17 @@ func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) { } } +func (h *Handler) handleMetrics(w http.ResponseWriter, r *http.Request) { + payload := metrics.Default.RenderPrometheus() + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(len(payload))) + w.WriteHeader(http.StatusOK) + if r.Method == http.MethodHead { + return + } + _, _ = w.Write([]byte(payload)) +} + func validateObjectKey(key string) *s3APIError { if key == "" { err := s3ErrInvalidObjectKey diff --git a/auth/middleware.go b/auth/middleware.go index eb0b902..63cebbd 100644 --- a/auth/middleware.go +++ b/auth/middleware.go @@ -1,6 +1,8 @@ package auth import ( + "errors" + "fs/metrics" "log/slog" "net" "net/http" @@ -18,17 +20,20 @@ func Middleware( return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { authCtx := RequestContext{Authenticated: false, AuthType: "none"} if svc == nil || !svc.Config().Enabled { + metrics.Default.ObserveAuth("bypass", "disabled", "auth_disabled") next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx))) return } - if r.URL.Path == "/healthz" { + if r.URL.Path == "/healthz" || r.URL.Path == "/metrics" { + metrics.Default.ObserveAuth("bypass", "none", "public_endpoint") next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx))) return } resolvedCtx, err := svc.AuthenticateRequest(r) if err != nil { + metrics.Default.ObserveAuth("error", "sigv4", authErrorClass(err)) if auditEnabled && logger != nil { requestID := middleware.GetReqID(r.Context()) attrs := []any{ @@ -50,6 +55,7 @@ func Middleware( return } + metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none") if auditEnabled && logger != nil { requestID := middleware.GetReqID(r.Context()) attrs := []any{ @@ -69,6 +75,33 @@ func Middleware( } } +func authErrorClass(err error) string { + switch { + case errors.Is(err, ErrInvalidAccessKeyID): + return "invalid_access_key" + case errors.Is(err, ErrSignatureDoesNotMatch): + return "signature_mismatch" + case errors.Is(err, ErrAuthorizationHeaderMalformed): + return "auth_header_malformed" + case errors.Is(err, ErrRequestTimeTooSkewed): + return "time_skew" + case errors.Is(err, ErrExpiredToken): + return "expired_token" + case errors.Is(err, ErrNoAuthCredentials): + return "missing_credentials" + case errors.Is(err, ErrUnsupportedAuthScheme): + return "unsupported_auth_scheme" + case errors.Is(err, ErrInvalidPresign): + return "invalid_presign" + case errors.Is(err, ErrCredentialDisabled): + return "credential_disabled" + case errors.Is(err, ErrAccessDenied): + return "access_denied" + default: + return "other" + } +} + func clientIP(remoteAddr string) string { host, _, err := net.SplitHostPort(remoteAddr) if err == nil && host != "" { diff --git a/logging/logging.go b/logging/logging.go index 28e1985..186607f 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -1,6 +1,7 @@ package logging import ( + "fs/metrics" "log/slog" "net/http" "os" @@ -9,6 +10,7 @@ import ( "strings" "time" + "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) @@ -86,6 +88,8 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) + metrics.Default.IncHTTPInFlight() + defer metrics.Default.DecHTTPInFlight() requestID := middleware.GetReqID(r.Context()) if requestID != "" { ww.Header().Set("x-amz-request-id", requestID) @@ -93,15 +97,18 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han next.ServeHTTP(ww, r) - if !cfg.Audit && !cfg.DebugMode { - return - } - elapsed := time.Since(start) status := ww.Status() if status == 0 { status = http.StatusOK } + route := metricRouteLabel(r) + metrics.Default.ObserveHTTPRequest(r.Method, route, status, elapsed, ww.BytesWritten()) + + if !cfg.Audit && !cfg.DebugMode { + return + } + attrs := []any{ "method", r.Method, "path", r.URL.Path, @@ -131,6 +138,35 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han } } +func metricRouteLabel(r *http.Request) string { + if r == nil || r.URL == nil { + return "/unknown" + } + + if routeCtx := chi.RouteContext(r.Context()); routeCtx != nil { + if pattern := strings.TrimSpace(routeCtx.RoutePattern()); pattern != "" { + return pattern + } + } + + path := strings.TrimSpace(r.URL.Path) + if path == "" || path == "/" { + return "/" + } + if path == "/healthz" || path == "/metrics" { + return path + } + + trimmed := strings.Trim(path, "/") + if trimmed == "" { + return "/" + } + if !strings.Contains(trimmed, "/") { + return "/{bucket}" + } + return "/{bucket}/*" +} + func envBool(key string, defaultValue bool) bool { raw := os.Getenv(key) if raw == "" { diff --git a/logging/logging_metrics_test.go b/logging/logging_metrics_test.go new file mode 100644 index 0000000..2a45b4b --- /dev/null +++ b/logging/logging_metrics_test.go @@ -0,0 +1,30 @@ +package logging + +import ( + "net/http/httptest" + "testing" +) + +func TestMetricRouteLabelFallbacks(t *testing.T) { + testCases := []struct { + name string + path string + want string + }{ + {name: "root", path: "/", want: "/"}, + {name: "health", path: "/healthz", want: "/healthz"}, + {name: "metrics", path: "/metrics", want: "/metrics"}, + {name: "bucket", path: "/some-bucket", want: "/{bucket}"}, + {name: "object", path: "/some-bucket/private/path/file.jpg", want: "/{bucket}/*"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req := httptest.NewRequest("GET", tc.path, nil) + got := metricRouteLabel(req) + if got != tc.want { + t.Fatalf("metricRouteLabel(%q) = %q, want %q", tc.path, got, tc.want) + } + }) + } +} diff --git a/metadata/metadata.go b/metadata/metadata.go index bf3a753..ff0c804 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "fs/metrics" "fs/models" "net" "regexp" @@ -47,7 +48,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { } h := &MetadataHandler{db: db} - err = h.db.Update(func(tx *bbolt.Tx) error { + err = h.update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(systemIndex) return err }) @@ -55,7 +56,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } - err = h.db.Update(func(tx *bbolt.Tx) error { + err = h.update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(multipartUploadIndex) return err }) @@ -63,7 +64,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } - err = h.db.Update(func(tx *bbolt.Tx) error { + err = h.update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex) return err }) @@ -71,7 +72,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } - err = h.db.Update(func(tx *bbolt.Tx) error { + err = h.update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(authIdentitiesIndex) return err }) @@ -79,7 +80,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) { _ = db.Close() return nil, err } - err = h.db.Update(func(tx *bbolt.Tx) error { + err = h.update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(authPoliciesIndex) return err }) @@ -119,6 +120,20 @@ func (h *MetadataHandler) Close() error { return h.db.Close() } +func (h *MetadataHandler) view(fn func(tx *bbolt.Tx) error) error { + start := time.Now() + err := h.db.View(fn) + metrics.Default.ObserveMetadataTx("view", time.Since(start), err == nil) + return err +} + +func (h *MetadataHandler) update(fn func(tx *bbolt.Tx) error) error { + start := time.Now() + err := h.db.Update(fn) + metrics.Default.ObserveMetadataTx("update", time.Since(start), err == nil) + return err +} + func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error { if identity == nil { return errors.New("auth identity is required") @@ -126,7 +141,7 @@ func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error { if strings.TrimSpace(identity.AccessKeyID) == "" { return errors.New("access key id is required") } - return h.db.Update(func(tx *bbolt.Tx) error { + return h.update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(authIdentitiesIndex) if bucket == nil { return errors.New("auth identities index not found") @@ -146,7 +161,7 @@ func (h *MetadataHandler) GetAuthIdentity(accessKeyID string) (*models.AuthIdent } var identity *models.AuthIdentity - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { bucket := tx.Bucket(authIdentitiesIndex) if bucket == nil { return errors.New("auth identities index not found") @@ -177,7 +192,7 @@ func (h *MetadataHandler) PutAuthPolicy(policy *models.AuthPolicy) error { return errors.New("auth policy principal is required") } policy.Principal = principal - return h.db.Update(func(tx *bbolt.Tx) error { + return h.update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(authPoliciesIndex) if bucket == nil { return errors.New("auth policies index not found") @@ -197,7 +212,7 @@ func (h *MetadataHandler) GetAuthPolicy(accessKeyID string) (*models.AuthPolicy, } var policy *models.AuthPolicy - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { bucket := tx.Bucket(authPoliciesIndex) if bucket == nil { return errors.New("auth policies index not found") @@ -224,7 +239,7 @@ func (h *MetadataHandler) CreateBucket(bucketName string) error { return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName) } - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex)) if err != nil { return err @@ -256,7 +271,7 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error { return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName) } - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex)) if err != nil { return err @@ -303,7 +318,7 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error { func (h *MetadataHandler) ListBuckets() ([]string, error) { buckets := []string{} - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) if systemIndexBucket == nil { return errors.New("system index not found") @@ -323,7 +338,7 @@ func (h *MetadataHandler) ListBuckets() ([]string, error) { func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) { var manifest *models.BucketManifest - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) if systemIndexBucket == nil { return errors.New("system index not found") @@ -353,7 +368,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error { return err } - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { data, err := json.Marshal(manifest) if err != nil { return err @@ -373,7 +388,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error { func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) { var manifest *models.ObjectManifest - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { metadataBucket := tx.Bucket([]byte(bucket)) if metadataBucket == nil { return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket) @@ -400,7 +415,7 @@ func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectMa var objects []*models.ObjectManifest - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) if systemIndexBucket == nil { return errors.New("system index not found") @@ -440,7 +455,7 @@ func (h *MetadataHandler) ForEachObjectFrom(bucket, startKey string, fn func(*mo return errors.New("object callback is required") } - return h.db.View(func(tx *bbolt.Tx) error { + return h.view(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) if systemIndexBucket == nil { return errors.New("system index not found") @@ -480,7 +495,7 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error { return err } - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { metadataBucket := tx.Bucket([]byte(bucket)) if metadataBucket == nil { return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket) @@ -497,7 +512,7 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error { func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) { deleted := make([]string, 0, len(keys)) - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { metadataBucket := tx.Bucket([]byte(bucket)) if metadataBucket == nil { return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket) @@ -525,7 +540,7 @@ func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]strin func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { var upload *models.MultipartUpload - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) if systemIndexBucket == nil { return errors.New("system index not found") @@ -548,7 +563,7 @@ func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.Mul State: "pending", } - err = h.db.Update(func(tx *bbolt.Tx) error { + err = h.update(func(tx *bbolt.Tx) error { multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex)) if multipartUploadBucket == nil { return errors.New("multipart upload index not found") @@ -643,7 +658,7 @@ func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error { func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) { var upload *models.MultipartUpload - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { var err error upload, _, err = getMultipartUploadFromTx(tx, uploadID) if err != nil { @@ -661,7 +676,7 @@ func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.Uploaded return fmt.Errorf("invalid part number: %d", part.PartNumber) } - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { upload, _, err := getMultipartUploadFromTx(tx, uploadID) if err != nil { return err @@ -690,7 +705,7 @@ func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.Uploaded func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) { parts := make([]models.UploadedPart, 0) - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil { return err } @@ -724,7 +739,7 @@ func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models return errors.New("final object manifest is required") } - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) if err != nil { return err @@ -763,7 +778,7 @@ func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models return nil } func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error { - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID) if err != nil { return err @@ -793,7 +808,7 @@ func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int, } cleaned := 0 - err := h.db.Update(func(tx *bbolt.Tx) error { + err := h.update(func(tx *bbolt.Tx) error { uploadsBucket, err := getMultipartUploadBucket(tx) if err != nil { return err @@ -843,7 +858,7 @@ func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) { chunkSet := make(map[string]struct{}) pendingUploadSet := make(map[string]struct{}) - err := h.db.View(func(tx *bbolt.Tx) error { + err := h.view(func(tx *bbolt.Tx) error { systemIndexBucket := tx.Bucket([]byte(systemIndex)) if systemIndexBucket == nil { return errors.New("system index not found") diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..a696659 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,471 @@ +package metrics + +import ( + "fmt" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +var defaultBuckets = []float64{ + 0.0005, 0.001, 0.0025, 0.005, 0.01, + 0.025, 0.05, 0.1, 0.25, 0.5, + 1, 2.5, 5, 10, +} + +var Default = NewRegistry() + +type histogram struct { + bounds []float64 + counts []uint64 + sum float64 + count uint64 +} + +func newHistogram(bounds []float64) *histogram { + cloned := make([]float64, len(bounds)) + copy(cloned, bounds) + return &histogram{ + bounds: cloned, + counts: make([]uint64, len(bounds)+1), + } +} + +func (h *histogram) observe(v float64) { + h.count++ + h.sum += v + for i, bound := range h.bounds { + if v <= bound { + h.counts[i]++ + return + } + } + h.counts[len(h.counts)-1]++ +} + +func (h *histogram) snapshot() (bounds []float64, counts []uint64, sum float64, count uint64) { + bounds = make([]float64, len(h.bounds)) + copy(bounds, h.bounds) + counts = make([]uint64, len(h.counts)) + copy(counts, h.counts) + return bounds, counts, h.sum, h.count +} + +type Registry struct { + startedAt time.Time + inFlight atomic.Int64 + + mu sync.Mutex + + httpRequests map[string]uint64 + httpResponseByte map[string]uint64 + httpDuration map[string]*histogram + + authRequests map[string]uint64 + + serviceOps map[string]uint64 + serviceDuration map[string]*histogram + + dbTxTotal map[string]uint64 + dbTxDuration map[string]*histogram + + blobOps map[string]uint64 + blobBytes map[string]uint64 + blobDuration map[string]*histogram + + gcRuns map[string]uint64 + gcDuration *histogram + gcDeletedChunks uint64 + gcDeleteErrors uint64 + gcCleanedUpload uint64 +} + +func NewRegistry() *Registry { + return &Registry{ + startedAt: time.Now(), + httpRequests: make(map[string]uint64), + httpResponseByte: make(map[string]uint64), + httpDuration: make(map[string]*histogram), + authRequests: make(map[string]uint64), + serviceOps: make(map[string]uint64), + serviceDuration: make(map[string]*histogram), + dbTxTotal: make(map[string]uint64), + dbTxDuration: make(map[string]*histogram), + blobOps: make(map[string]uint64), + blobBytes: make(map[string]uint64), + blobDuration: make(map[string]*histogram), + gcRuns: make(map[string]uint64), + gcDuration: newHistogram(defaultBuckets), + } +} + +func (r *Registry) IncHTTPInFlight() { + r.inFlight.Add(1) +} + +func (r *Registry) DecHTTPInFlight() { + r.inFlight.Add(-1) +} + +func (r *Registry) ObserveHTTPRequest(method, route string, status int, d time.Duration, responseBytes int) { + route = normalizeRoute(route) + key := method + "|" + route + "|" + strconv.Itoa(status) + durationKey := method + "|" + route + + r.mu.Lock() + r.httpRequests[key]++ + if responseBytes > 0 { + r.httpResponseByte[key] += uint64(responseBytes) + } + h := r.httpDuration[durationKey] + if h == nil { + h = newHistogram(defaultBuckets) + r.httpDuration[durationKey] = h + } + h.observe(d.Seconds()) + r.mu.Unlock() +} + +func (r *Registry) ObserveAuth(result, authType, reason string) { + authType = strings.TrimSpace(authType) + if authType == "" { + authType = "unknown" + } + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "none" + } + key := result + "|" + authType + "|" + reason + r.mu.Lock() + r.authRequests[key]++ + r.mu.Unlock() +} + +func (r *Registry) ObserveService(operation string, d time.Duration, ok bool) { + result := "error" + if ok { + result = "ok" + } + key := operation + "|" + result + r.mu.Lock() + r.serviceOps[key]++ + h := r.serviceDuration[operation] + if h == nil { + h = newHistogram(defaultBuckets) + r.serviceDuration[operation] = h + } + h.observe(d.Seconds()) + r.mu.Unlock() +} + +func (r *Registry) ObserveMetadataTx(txType string, d time.Duration, ok bool) { + result := "error" + if ok { + result = "ok" + } + key := txType + "|" + result + r.mu.Lock() + r.dbTxTotal[key]++ + h := r.dbTxDuration[txType] + if h == nil { + h = newHistogram(defaultBuckets) + r.dbTxDuration[txType] = h + } + h.observe(d.Seconds()) + r.mu.Unlock() +} + +func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool) { + result := "error" + if ok { + result = "ok" + } + key := operation + "|" + result + r.mu.Lock() + r.blobOps[key]++ + h := r.blobDuration[operation] + if h == nil { + h = newHistogram(defaultBuckets) + r.blobDuration[operation] = h + } + h.observe(d.Seconds()) + if bytes > 0 { + switch operation { + case "read_chunk": + r.blobBytes["read"] += uint64(bytes) + case "write_chunk": + r.blobBytes["write"] += uint64(bytes) + } + } + r.mu.Unlock() +} + +func (r *Registry) ObserveGC(d time.Duration, deletedChunks, deleteErrors, cleanedUploads int, ok bool) { + result := "error" + if ok { + result = "ok" + } + r.mu.Lock() + r.gcRuns[result]++ + r.gcDuration.observe(d.Seconds()) + if deletedChunks > 0 { + r.gcDeletedChunks += uint64(deletedChunks) + } + if deleteErrors > 0 { + r.gcDeleteErrors += uint64(deleteErrors) + } + if cleanedUploads > 0 { + r.gcCleanedUpload += uint64(cleanedUploads) + } + r.mu.Unlock() +} + +func (r *Registry) RenderPrometheus() string { + now := time.Now() + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + + r.mu.Lock() + httpReq := copyCounterMap(r.httpRequests) + httpBytes := copyCounterMap(r.httpResponseByte) + httpDur := copyHistogramMap(r.httpDuration) + authReq := copyCounterMap(r.authRequests) + serviceOps := copyCounterMap(r.serviceOps) + serviceDur := copyHistogramMap(r.serviceDuration) + dbTx := copyCounterMap(r.dbTxTotal) + dbTxDur := copyHistogramMap(r.dbTxDuration) + blobOps := copyCounterMap(r.blobOps) + blobBytes := copyCounterMap(r.blobBytes) + blobDur := copyHistogramMap(r.blobDuration) + gcRuns := copyCounterMap(r.gcRuns) + gcDurBounds, gcDurCounts, gcDurSum, gcDurCount := r.gcDuration.snapshot() + gcDeletedChunks := r.gcDeletedChunks + gcDeleteErrors := r.gcDeleteErrors + gcCleanedUploads := r.gcCleanedUpload + r.mu.Unlock() + + var b strings.Builder + + writeGauge(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests.", float64(r.inFlight.Load())) + writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests handled.", httpReq, []string{"method", "route", "status"}) + writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpBytes, []string{"method", "route", "status"}) + writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency.", httpDur, []string{"method", "route"}) + + writeCounterVecKV(&b, "fs_auth_requests_total", "Authentication attempts by result.", authReq, []string{"result", "auth_type", "reason"}) + + writeCounterVecKV(&b, "fs_service_operations_total", "Service-level operation calls.", serviceOps, []string{"operation", "result"}) + writeHistogramVecKV(&b, "fs_service_operation_duration_seconds", "Service-level operation latency.", serviceDur, []string{"operation"}) + + writeCounterVecKV(&b, "fs_metadata_tx_total", "Metadata transaction calls.", dbTx, []string{"type", "result"}) + writeHistogramVecKV(&b, "fs_metadata_tx_duration_seconds", "Metadata transaction latency.", dbTxDur, []string{"type"}) + + writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"operation", "result"}) + writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed.", blobBytes, []string{"direction"}) + writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob operation latency.", blobDur, []string{"operation"}) + + writeCounterVecKV(&b, "fs_gc_runs_total", "Garbage collection runs.", gcRuns, []string{"result"}) + writeHistogram(&b, "fs_gc_duration_seconds", "Garbage collection runtime.", nil, gcDurBounds, gcDurCounts, gcDurSum, gcDurCount) + writeCounter(&b, "fs_gc_deleted_chunks_total", "Deleted chunks during GC.", gcDeletedChunks) + writeCounter(&b, "fs_gc_delete_errors_total", "Chunk delete errors during GC.", gcDeleteErrors) + writeCounter(&b, "fs_gc_cleaned_uploads_total", "Cleaned multipart uploads during GC.", gcCleanedUploads) + + writeGauge(&b, "fs_uptime_seconds", "Process uptime in seconds.", now.Sub(r.startedAt).Seconds()) + writeGauge(&b, "fs_runtime_goroutines", "Number of goroutines.", float64(runtime.NumGoroutine())) + writeGaugeVec(&b, "fs_runtime_memory_bytes", "Runtime memory in bytes.", map[string]float64{ + "alloc": float64(mem.Alloc), + "total": float64(mem.TotalAlloc), + "sys": float64(mem.Sys), + "heap_alloc": float64(mem.HeapAlloc), + "heap_sys": float64(mem.HeapSys), + "stack_sys": float64(mem.StackSys), + }, "type") + writeCounter(&b, "fs_runtime_gc_cycles_total", "Completed GC cycles.", uint64(mem.NumGC)) + writeCounterFloat(&b, "fs_runtime_gc_pause_seconds_total", "Total GC pause time in seconds.", float64(mem.PauseTotalNs)/1e9) + + return b.String() +} + +func normalizeRoute(route string) string { + route = strings.TrimSpace(route) + if route == "" { + return "/unknown" + } + return route +} + +type histogramSnapshot struct { + bounds []float64 + counts []uint64 + sum float64 + count uint64 +} + +func copyCounterMap(src map[string]uint64) map[string]uint64 { + out := make(map[string]uint64, len(src)) + for k, v := range src { + out[k] = v + } + return out +} + +func copyHistogramMap(src map[string]*histogram) map[string]histogramSnapshot { + out := make(map[string]histogramSnapshot, len(src)) + for k, h := range src { + bounds, counts, sum, count := h.snapshot() + out[k] = histogramSnapshot{ + bounds: bounds, + counts: counts, + sum: sum, + count: count, + } + } + return out +} + +func writeCounter(b *strings.Builder, name, help string, value uint64) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s counter\n", name) + fmt.Fprintf(b, "%s %d\n", name, value) +} + +func writeCounterFloat(b *strings.Builder, name, help string, value float64) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s counter\n", name) + fmt.Fprintf(b, "%s %.9f\n", name, value) +} + +func writeGauge(b *strings.Builder, name, help string, value float64) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s gauge\n", name) + fmt.Fprintf(b, "%s %.9f\n", name, value) +} + +func writeGaugeVec(b *strings.Builder, name, help string, values map[string]float64, labelName string) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s gauge\n", name) + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), values[key]) + } +} + +func writeCounterVecKV(b *strings.Builder, name, help string, values map[string]uint64, labels []string) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s counter\n", name) + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + parts := strings.Split(key, "|") + fmt.Fprintf(b, "%s{%s} %d\n", name, formatLabels(labels, parts), values[key]) + } +} + +func writeHistogramVecKV(b *strings.Builder, name, help string, values map[string]histogramSnapshot, labels []string) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s histogram\n", name) + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + parts := strings.Split(key, "|") + labelsMap := make(map[string]string, len(labels)) + for i, label := range labels { + if i < len(parts) { + labelsMap[label] = parts[i] + } else { + labelsMap[label] = "" + } + } + writeHistogramWithLabelsMap(b, name, labelsMap, values[key]) + } +} + +func writeHistogram(b *strings.Builder, name, help string, labels map[string]string, bounds []float64, counts []uint64, sum float64, count uint64) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s histogram\n", name) + writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{ + bounds: bounds, + counts: counts, + sum: sum, + count: count, + }) +} + +func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[string]string, s histogramSnapshot) { + var cumulative uint64 + for i, bucketCount := range s.counts { + cumulative += bucketCount + bucketLabels := cloneLabels(labels) + if i < len(s.bounds) { + bucketLabels["le"] = trimFloat(s.bounds[i]) + } else { + bucketLabels["le"] = "+Inf" + } + fmt.Fprintf(b, "%s_bucket{%s} %d\n", name, labelsToString(bucketLabels), cumulative) + } + fmt.Fprintf(b, "%s_sum{%s} %.9f\n", name, labelsToString(labels), s.sum) + fmt.Fprintf(b, "%s_count{%s} %d\n", name, labelsToString(labels), s.count) +} + +func formatLabels(keys, values []string) string { + parts := make([]string, 0, len(keys)) + for i, key := range keys { + value := "" + if i < len(values) { + value = values[i] + } + parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(value))) + } + return strings.Join(parts, ",") +} + +func labelsToString(labels map[string]string) string { + if len(labels) == 0 { + return "" + } + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + parts := make([]string, 0, len(keys)) + for _, key := range keys { + parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(labels[key]))) + } + return strings.Join(parts, ",") +} + +func cloneLabels(in map[string]string) map[string]string { + if len(in) == 0 { + return map[string]string{} + } + out := make(map[string]string, len(in)+1) + for k, v := range in { + out[k] = v + } + return out +} + +func trimFloat(v float64) string { + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func escapeLabelValue(value string) string { + value = strings.ReplaceAll(value, `\`, `\\`) + value = strings.ReplaceAll(value, "\n", `\n`) + value = strings.ReplaceAll(value, `"`, `\"`) + return value +} diff --git a/service/service.go b/service/service.go index 09b04d6..8442a43 100644 --- a/service/service.go +++ b/service/service.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "fs/metadata" + "fs/metrics" "fs/models" "fs/storage" "io" @@ -42,6 +43,12 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st } func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("put_object", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() @@ -71,10 +78,17 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read return nil, err } + success = true return manifest, nil } func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("get_object", time.Since(start), success) + }() + s.gcMu.RLock() manifest, err := s.metadata.GetManifest(bucket, key) @@ -92,10 +106,17 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob } _ = pw.Close() }() + success = true return pr, manifest, nil } func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("head_object", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() @@ -103,13 +124,22 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e if err != nil { return models.ObjectManifest{}, err } + success = true return *manifest, nil } func (s *ObjectService) DeleteObject(bucket, key string) error { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("delete_object", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() - return s.metadata.DeleteManifest(bucket, key) + err := s.metadata.DeleteManifest(bucket, key) + success = err == nil + return err } func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) { @@ -120,16 +150,32 @@ func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectMani } func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("for_each_object_from", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() - return s.metadata.ForEachObjectFrom(bucket, startKey, fn) + err := s.metadata.ForEachObjectFrom(bucket, startKey, fn) + success = err == nil + return err } func (s *ObjectService) CreateBucket(bucket string) error { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("create_bucket", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() - return s.metadata.CreateBucket(bucket) + err := s.metadata.CreateBucket(bucket) + success = err == nil + return err } func (s *ObjectService) HeadBucket(bucket string) error { @@ -154,10 +200,18 @@ func (s *ObjectService) DeleteBucket(bucket string) error { } func (s *ObjectService) ListBuckets() ([]string, error) { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("list_buckets", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() - return s.metadata.ListBuckets() + buckets, err := s.metadata.ListBuckets() + success = err == nil + return buckets, err } func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { @@ -173,6 +227,12 @@ func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.Multi } func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("upload_part", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() @@ -204,6 +264,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, if err != nil { return "", err } + success = true return etag, nil } @@ -222,6 +283,12 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode } func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveService("complete_multipart_upload", time.Since(start), success) + }() + s.gcMu.RLock() defer s.gcMu.RUnlock() @@ -288,6 +355,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co return nil, err } + success = true return manifest, nil } @@ -327,6 +395,15 @@ func (s *ObjectService) Close() error { } func (s *ObjectService) GarbageCollect() error { + start := time.Now() + success := false + deletedChunks := 0 + deleteErrors := 0 + cleanedUploads := 0 + defer func() { + metrics.Default.ObserveGC(time.Since(start), deletedChunks, deleteErrors, cleanedUploads, success) + }() + s.gcMu.Lock() defer s.gcMu.Unlock() @@ -336,9 +413,6 @@ func (s *ObjectService) GarbageCollect() error { } totalChunks := 0 - deletedChunks := 0 - deleteErrors := 0 - cleanedUploads := 0 if err := s.blob.ForEachChunk(func(chunkID string) error { totalChunks++ @@ -368,6 +442,7 @@ func (s *ObjectService) GarbageCollect() error { "delete_errors", deleteErrors, "cleaned_uploads", cleanedUploads, ) + success = true return nil } diff --git a/storage/blob.go b/storage/blob.go index 6215a6f..41297df 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -6,10 +6,12 @@ import ( "encoding/hex" "errors" "fmt" + "fs/metrics" "io" "os" "path/filepath" "strings" + "time" ) const blobRoot = "blobs" @@ -37,11 +39,16 @@ func NewBlobStore(root string, chunkSize int) (*BlobStore, error) { } func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) { + start := time.Now() fullFileHasher := md5.New() buffer := make([]byte, bs.chunkSize) var totalSize int64 var chunkIDs []string + success := false + defer func() { + metrics.Default.ObserveBlob("ingest_stream", time.Since(start), 0, success) + }() for { bytesRead, err := io.ReadFull(stream, buffer) @@ -74,10 +81,17 @@ func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, er } etag := hex.EncodeToString(fullFileHasher.Sum(nil)) + success = true return chunkIDs, totalSize, etag, nil } func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveBlob("write_chunk", time.Since(start), int64(len(data)), success) + }() + if !isValidChunkID(chunkID) { return fmt.Errorf("invalid chunk id: %q", chunkID) } @@ -88,6 +102,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { fullPath := filepath.Join(dir, chunkID) if _, err := os.Stat(fullPath); err == nil { + success = true return nil } else if !os.IsNotExist(err) { return err @@ -119,6 +134,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { if err := os.Rename(tmpPath, fullPath); err != nil { if _, statErr := os.Stat(fullPath); statErr == nil { + success = true return nil } return err @@ -128,10 +144,17 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { if err := syncDir(dir); err != nil { return err } + success = true return nil } func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error { + start := time.Now() + success := false + defer func() { + metrics.Default.ObserveBlob("assemble_stream", time.Since(start), 0, success) + }() + for _, chunkID := range chunkIDs { chunkData, err := bs.GetBlob(chunkID) if err != nil { @@ -141,14 +164,28 @@ func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error { return err } } + success = true return nil } func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) { + start := time.Now() + success := false + var size int64 + defer func() { + metrics.Default.ObserveBlob("read_chunk", time.Since(start), size, success) + }() + if !isValidChunkID(chunkID) { return nil, fmt.Errorf("invalid chunk id: %q", chunkID) } - return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) + data, err := os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID)) + if err != nil { + return nil, err + } + size = int64(len(data)) + success = true + return data, nil } func (bs *BlobStore) DeleteBlob(chunkID string) error { From 6ca3fb87017b041ef922325da1453787a899e21c Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Fri, 27 Feb 2026 16:38:51 +0100 Subject: [PATCH 2/4] Updated metrics --- api/api.go | 21 +- api/s3_errors.go | 8 + logging/logging.go | 22 +- metrics/metrics.go | 496 ++++++++++++++++++++++++++++++++++++++------- service/service.go | 95 +++++---- 5 files changed, 530 insertions(+), 112 deletions(-) diff --git a/api/api.go b/api/api.go index 90d2589..61e81a8 100644 --- a/api/api.go +++ b/api/api.go @@ -236,6 +236,7 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path) return } + metrics.Default.ObserveBatchSize(len(req.Parts)) manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts) if err != nil { @@ -311,6 +312,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) return } + metrics.Default.ObserveBatchSize(1) if ifNoneMatch := strings.TrimSpace(r.Header.Get("If-None-Match")); ifNoneMatch != "" { manifest, err := h.svc.HeadObject(bucket, key) @@ -523,6 +525,7 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) { writeS3Error(w, r, s3ErrTooManyDeleteObjects, r.URL.Path) return } + metrics.Default.ObserveBatchSize(len(req.Objects)) keys := make([]string, 0, len(req.Objects)) response := models.DeleteObjectsResult{ @@ -641,6 +644,8 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener { if maxConns <= 0 { return inner } + metrics.Default.SetConnectionPoolMax(maxConns) + metrics.Default.SetWorkerPoolSize(maxConns) return &limitedListener{ Listener: inner, slots: make(chan struct{}, maxConns), @@ -648,15 +653,27 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener { } func (l *limitedListener) Accept() (net.Conn, error) { - l.slots <- struct{}{} + select { + case l.slots <- struct{}{}: + default: + metrics.Default.IncConnectionPoolWait() + metrics.Default.IncRequestQueueLength() + l.slots <- struct{}{} + metrics.Default.DecRequestQueueLength() + } + metrics.Default.IncConnectionPoolActive() conn, err := l.Listener.Accept() if err != nil { <-l.slots + metrics.Default.DecConnectionPoolActive() return nil, err } return &limitedConn{ Conn: conn, - done: func() { <-l.slots }, + done: func() { + <-l.slots + metrics.Default.DecConnectionPoolActive() + }, }, nil } diff --git a/api/s3_errors.go b/api/s3_errors.go index be02154..95e7424 100644 --- a/api/s3_errors.go +++ b/api/s3_errors.go @@ -5,6 +5,7 @@ import ( "errors" "fs/auth" "fs/metadata" + "fs/metrics" "fs/models" "fs/service" "net/http" @@ -200,12 +201,19 @@ func mapToS3Error(err error) s3APIError { func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) { requestID := "" + op := "other" if r != nil { requestID = middleware.GetReqID(r.Context()) + isDeletePost := false + if r.Method == http.MethodPost { + _, isDeletePost = r.URL.Query()["delete"] + } + op = metrics.NormalizeHTTPOperation(r.Method, isDeletePost) if requestID != "" { w.Header().Set("x-amz-request-id", requestID) } } + metrics.Default.ObserveError(op, apiErr.Code) w.Header().Set("Content-Type", "application/xml; charset=utf-8") w.WriteHeader(apiErr.Status) diff --git a/logging/logging.go b/logging/logging.go index 186607f..2749695 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -88,8 +88,13 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) - metrics.Default.IncHTTPInFlight() - defer metrics.Default.DecHTTPInFlight() + op := metricOperationLabel(r) + metrics.Default.IncHTTPInFlightOp(op) + metrics.Default.IncWorkerPoolActive() + defer func() { + metrics.Default.DecHTTPInFlightOp(op) + metrics.Default.DecWorkerPoolActive() + }() requestID := middleware.GetReqID(r.Context()) if requestID != "" { ww.Header().Set("x-amz-request-id", requestID) @@ -103,7 +108,7 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han status = http.StatusOK } route := metricRouteLabel(r) - metrics.Default.ObserveHTTPRequest(r.Method, route, status, elapsed, ww.BytesWritten()) + metrics.Default.ObserveHTTPRequestDetailed(r.Method, route, op, status, elapsed, ww.BytesWritten()) if !cfg.Audit && !cfg.DebugMode { return @@ -167,6 +172,17 @@ func metricRouteLabel(r *http.Request) string { return "/{bucket}/*" } +func metricOperationLabel(r *http.Request) string { + if r == nil { + return "other" + } + isDeletePost := false + if r.Method == http.MethodPost && r.URL != nil { + _, isDeletePost = r.URL.Query()["delete"] + } + return metrics.NormalizeHTTPOperation(r.Method, isDeletePost) +} + func envBool(key string, defaultValue bool) bool { raw := os.Getenv(key) if raw == "" { diff --git a/metrics/metrics.go b/metrics/metrics.go index a696659..307f8aa 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -2,12 +2,14 @@ package metrics import ( "fmt" + "os" "runtime" "sort" "strconv" "strings" "sync" "sync/atomic" + "syscall" "time" ) @@ -17,6 +19,14 @@ var defaultBuckets = []float64{ 1, 2.5, 5, 10, } +var lockBuckets = []float64{ + 0.000001, 0.000005, 0.00001, 0.00005, + 0.0001, 0.0005, 0.001, 0.005, 0.01, + 0.025, 0.05, 0.1, 0.25, 0.5, 1, +} + +var batchBuckets = []float64{1, 2, 4, 8, 16, 32, 64, 100, 128, 256, 512, 1000, 5000} + var Default = NewRegistry() type histogram struct { @@ -57,13 +67,26 @@ func (h *histogram) snapshot() (bounds []float64, counts []uint64, sum float64, type Registry struct { startedAt time.Time - inFlight atomic.Int64 + + httpInFlight atomic.Int64 + + connectionPoolActive atomic.Int64 + connectionPoolMax atomic.Int64 + connectionPoolWaits atomic.Uint64 + + requestQueueLength atomic.Int64 + workerPoolActive atomic.Int64 + workerPoolSize atomic.Int64 mu sync.Mutex - httpRequests map[string]uint64 - httpResponseByte map[string]uint64 - httpDuration map[string]*histogram + httpRequestsRoute map[string]uint64 + httpResponseBytesRoute map[string]uint64 + httpDurationRoute map[string]*histogram + + httpRequestsOp map[string]uint64 + httpDurationOp map[string]*histogram + httpInFlightOp map[string]int64 authRequests map[string]uint64 @@ -77,6 +100,17 @@ type Registry struct { blobBytes map[string]uint64 blobDuration map[string]*histogram + lockWait map[string]*histogram + lockHold map[string]*histogram + + cacheHits map[string]uint64 + cacheMisses map[string]uint64 + + batchSize *histogram + + retries map[string]uint64 + errors map[string]uint64 + gcRuns map[string]uint64 gcDuration *histogram gcDeletedChunks uint64 @@ -86,47 +120,136 @@ type Registry struct { func NewRegistry() *Registry { return &Registry{ - startedAt: time.Now(), - httpRequests: make(map[string]uint64), - httpResponseByte: make(map[string]uint64), - httpDuration: make(map[string]*histogram), - authRequests: make(map[string]uint64), - serviceOps: make(map[string]uint64), - serviceDuration: make(map[string]*histogram), - dbTxTotal: make(map[string]uint64), - dbTxDuration: make(map[string]*histogram), - blobOps: make(map[string]uint64), - blobBytes: make(map[string]uint64), - blobDuration: make(map[string]*histogram), - gcRuns: make(map[string]uint64), - gcDuration: newHistogram(defaultBuckets), + startedAt: time.Now(), + httpRequestsRoute: make(map[string]uint64), + httpResponseBytesRoute: make(map[string]uint64), + httpDurationRoute: make(map[string]*histogram), + httpRequestsOp: make(map[string]uint64), + httpDurationOp: make(map[string]*histogram), + httpInFlightOp: make(map[string]int64), + authRequests: make(map[string]uint64), + serviceOps: make(map[string]uint64), + serviceDuration: make(map[string]*histogram), + dbTxTotal: make(map[string]uint64), + dbTxDuration: make(map[string]*histogram), + blobOps: make(map[string]uint64), + blobBytes: make(map[string]uint64), + blobDuration: make(map[string]*histogram), + lockWait: make(map[string]*histogram), + lockHold: make(map[string]*histogram), + cacheHits: make(map[string]uint64), + cacheMisses: make(map[string]uint64), + batchSize: newHistogram(batchBuckets), + retries: make(map[string]uint64), + errors: make(map[string]uint64), + gcRuns: make(map[string]uint64), + gcDuration: newHistogram(defaultBuckets), } } +func NormalizeHTTPOperation(method string, isDeletePost bool) string { + switch strings.ToUpper(strings.TrimSpace(method)) { + case "GET": + return "get" + case "PUT": + return "put" + case "DELETE": + return "delete" + case "HEAD": + return "head" + case "POST": + if isDeletePost { + return "delete" + } + return "put" + default: + return "other" + } +} + +func statusResult(status int) string { + if status >= 200 && status < 400 { + return "ok" + } + return "error" +} + +func normalizeRoute(route string) string { + route = strings.TrimSpace(route) + if route == "" { + return "/unknown" + } + return route +} + +func normalizeOp(op string) string { + op = strings.ToLower(strings.TrimSpace(op)) + if op == "" { + return "other" + } + return op +} + func (r *Registry) IncHTTPInFlight() { - r.inFlight.Add(1) + r.httpInFlight.Add(1) } func (r *Registry) DecHTTPInFlight() { - r.inFlight.Add(-1) + r.httpInFlight.Add(-1) +} + +func (r *Registry) IncHTTPInFlightOp(op string) { + r.httpInFlight.Add(1) + op = normalizeOp(op) + r.mu.Lock() + r.httpInFlightOp[op]++ + r.mu.Unlock() +} + +func (r *Registry) DecHTTPInFlightOp(op string) { + r.httpInFlight.Add(-1) + op = normalizeOp(op) + r.mu.Lock() + r.httpInFlightOp[op]-- + if r.httpInFlightOp[op] < 0 { + r.httpInFlightOp[op] = 0 + } + r.mu.Unlock() } func (r *Registry) ObserveHTTPRequest(method, route string, status int, d time.Duration, responseBytes int) { + op := NormalizeHTTPOperation(method, false) + r.ObserveHTTPRequestDetailed(method, route, op, status, d, responseBytes) +} + +func (r *Registry) ObserveHTTPRequestDetailed(method, route, op string, status int, d time.Duration, responseBytes int) { route = normalizeRoute(route) - key := method + "|" + route + "|" + strconv.Itoa(status) - durationKey := method + "|" + route + op = normalizeOp(op) + result := statusResult(status) + + routeKey := method + "|" + route + "|" + strconv.Itoa(status) + routeDurKey := method + "|" + route + opKey := op + "|" + result r.mu.Lock() - r.httpRequests[key]++ + r.httpRequestsRoute[routeKey]++ if responseBytes > 0 { - r.httpResponseByte[key] += uint64(responseBytes) + r.httpResponseBytesRoute[routeKey] += uint64(responseBytes) } - h := r.httpDuration[durationKey] - if h == nil { - h = newHistogram(defaultBuckets) - r.httpDuration[durationKey] = h + hRoute := r.httpDurationRoute[routeDurKey] + if hRoute == nil { + hRoute = newHistogram(defaultBuckets) + r.httpDurationRoute[routeDurKey] = hRoute } - h.observe(d.Seconds()) + hRoute.observe(d.Seconds()) + + r.httpRequestsOp[opKey]++ + hOp := r.httpDurationOp[opKey] + if hOp == nil { + hOp = newHistogram(defaultBuckets) + r.httpDurationOp[opKey] = hOp + } + hOp.observe(d.Seconds()) r.mu.Unlock() } @@ -179,31 +302,166 @@ func (r *Registry) ObserveMetadataTx(txType string, d time.Duration, ok bool) { r.mu.Unlock() } -func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool) { +func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool, backend ...string) { + be := "disk" + if len(backend) > 0 { + candidate := strings.TrimSpace(backend[0]) + if candidate != "" { + be = strings.ToLower(candidate) + } + } result := "error" if ok { result = "ok" } - key := operation + "|" + result + op := strings.ToLower(strings.TrimSpace(operation)) + if op == "" { + op = "unknown" + } + + histKey := op + "|" + be + "|" + result + opsKey := histKey + r.mu.Lock() - r.blobOps[key]++ - h := r.blobDuration[operation] + r.blobOps[opsKey]++ + h := r.blobDuration[histKey] if h == nil { h = newHistogram(defaultBuckets) - r.blobDuration[operation] = h + r.blobDuration[histKey] = h } h.observe(d.Seconds()) + if bytes > 0 { - switch operation { - case "read_chunk": - r.blobBytes["read"] += uint64(bytes) - case "write_chunk": - r.blobBytes["write"] += uint64(bytes) - } + r.blobBytes[op] += uint64(bytes) } r.mu.Unlock() } +func (r *Registry) SetConnectionPoolMax(max int) { + if max < 0 { + max = 0 + } + r.connectionPoolMax.Store(int64(max)) +} + +func (r *Registry) IncConnectionPoolActive() { + r.connectionPoolActive.Add(1) +} + +func (r *Registry) DecConnectionPoolActive() { + r.connectionPoolActive.Add(-1) +} + +func (r *Registry) IncConnectionPoolWait() { + r.connectionPoolWaits.Add(1) +} + +func (r *Registry) IncRequestQueueLength() { + r.requestQueueLength.Add(1) +} + +func (r *Registry) DecRequestQueueLength() { + r.requestQueueLength.Add(-1) +} + +func (r *Registry) SetWorkerPoolSize(size int) { + if size < 0 { + size = 0 + } + r.workerPoolSize.Store(int64(size)) +} + +func (r *Registry) IncWorkerPoolActive() { + r.workerPoolActive.Add(1) +} + +func (r *Registry) DecWorkerPoolActive() { + r.workerPoolActive.Add(-1) +} + +func (r *Registry) ObserveLockWait(lockName string, d time.Duration) { + lockName = strings.TrimSpace(lockName) + if lockName == "" { + lockName = "unknown" + } + r.mu.Lock() + h := r.lockWait[lockName] + if h == nil { + h = newHistogram(lockBuckets) + r.lockWait[lockName] = h + } + h.observe(d.Seconds()) + r.mu.Unlock() +} + +func (r *Registry) ObserveLockHold(lockName string, d time.Duration) { + lockName = strings.TrimSpace(lockName) + if lockName == "" { + lockName = "unknown" + } + r.mu.Lock() + h := r.lockHold[lockName] + if h == nil { + h = newHistogram(lockBuckets) + r.lockHold[lockName] = h + } + h.observe(d.Seconds()) + r.mu.Unlock() +} + +func (r *Registry) ObserveCacheHit(cache string) { + cache = strings.TrimSpace(cache) + if cache == "" { + cache = "unknown" + } + r.mu.Lock() + r.cacheHits[cache]++ + r.mu.Unlock() +} + +func (r *Registry) ObserveCacheMiss(cache string) { + cache = strings.TrimSpace(cache) + if cache == "" { + cache = "unknown" + } + r.mu.Lock() + r.cacheMisses[cache]++ + r.mu.Unlock() +} + +func (r *Registry) ObserveBatchSize(size int) { + if size < 0 { + size = 0 + } + r.mu.Lock() + r.batchSize.observe(float64(size)) + r.mu.Unlock() +} + +func (r *Registry) ObserveRetry(op, reason string) { + op = normalizeOp(op) + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "unknown" + } + key := op + "|" + reason + r.mu.Lock() + r.retries[key]++ + r.mu.Unlock() +} + +func (r *Registry) ObserveError(op, reason string) { + op = normalizeOp(op) + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "unknown" + } + key := op + "|" + reason + r.mu.Lock() + r.errors[key]++ + r.mu.Unlock() +} + func (r *Registry) ObserveGC(d time.Duration, deletedChunks, deleteErrors, cleanedUploads int, ok bool) { result := "error" if ok { @@ -230,9 +488,12 @@ func (r *Registry) RenderPrometheus() string { runtime.ReadMemStats(&mem) r.mu.Lock() - httpReq := copyCounterMap(r.httpRequests) - httpBytes := copyCounterMap(r.httpResponseByte) - httpDur := copyHistogramMap(r.httpDuration) + httpReqRoute := copyCounterMap(r.httpRequestsRoute) + httpRespRoute := copyCounterMap(r.httpResponseBytesRoute) + httpDurRoute := copyHistogramMap(r.httpDurationRoute) + httpReqOp := copyCounterMap(r.httpRequestsOp) + httpDurOp := copyHistogramMap(r.httpDurationOp) + httpInFlightOp := copyIntGaugeMap(r.httpInFlightOp) authReq := copyCounterMap(r.authRequests) serviceOps := copyCounterMap(r.serviceOps) serviceDur := copyHistogramMap(r.serviceDuration) @@ -241,6 +502,13 @@ func (r *Registry) RenderPrometheus() string { blobOps := copyCounterMap(r.blobOps) blobBytes := copyCounterMap(r.blobBytes) blobDur := copyHistogramMap(r.blobDuration) + lockWait := copyHistogramMap(r.lockWait) + lockHold := copyHistogramMap(r.lockHold) + cacheHits := copyCounterMap(r.cacheHits) + cacheMisses := copyCounterMap(r.cacheMisses) + batchBounds, batchCounts, batchSum, batchCount := r.batchSize.snapshot() + retries := copyCounterMap(r.retries) + errorsTotal := copyCounterMap(r.errors) gcRuns := copyCounterMap(r.gcRuns) gcDurBounds, gcDurCounts, gcDurSum, gcDurCount := r.gcDuration.snapshot() gcDeletedChunks := r.gcDeletedChunks @@ -248,12 +516,27 @@ func (r *Registry) RenderPrometheus() string { gcCleanedUploads := r.gcCleanedUpload r.mu.Unlock() + connectionActive := float64(r.connectionPoolActive.Load()) + connectionMax := float64(r.connectionPoolMax.Load()) + connectionWaits := r.connectionPoolWaits.Load() + queueLength := float64(r.requestQueueLength.Load()) + workerActive := float64(r.workerPoolActive.Load()) + workerSize := float64(r.workerPoolSize.Load()) + + openFDs, hasOpenFDs := readOpenFDs() + resident, hasResident := readResidentMemoryBytes() + cpuSeconds, hasCPU := readProcessCPUSeconds() + var b strings.Builder - writeGauge(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests.", float64(r.inFlight.Load())) - writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests handled.", httpReq, []string{"method", "route", "status"}) - writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpBytes, []string{"method", "route", "status"}) - writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency.", httpDur, []string{"method", "route"}) + httpInFlightOp["all"] = r.httpInFlight.Load() + writeGaugeVecFromInt64(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests by operation.", httpInFlightOp, "op") + writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests by operation and result.", httpReqOp, []string{"op", "result"}) + writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency by operation and result.", httpDurOp, []string{"op", "result"}) + + writeCounterVecKV(&b, "fs_http_requests_by_route_total", "Total HTTP requests by method/route/status.", httpReqRoute, []string{"method", "route", "status"}) + writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpRespRoute, []string{"method", "route", "status"}) + writeHistogramVecKV(&b, "fs_http_request_duration_by_route_seconds", "HTTP request latency by method/route.", httpDurRoute, []string{"method", "route"}) writeCounterVecKV(&b, "fs_auth_requests_total", "Authentication attempts by result.", authReq, []string{"result", "auth_type", "reason"}) @@ -263,9 +546,28 @@ func (r *Registry) RenderPrometheus() string { writeCounterVecKV(&b, "fs_metadata_tx_total", "Metadata transaction calls.", dbTx, []string{"type", "result"}) writeHistogramVecKV(&b, "fs_metadata_tx_duration_seconds", "Metadata transaction latency.", dbTxDur, []string{"type"}) - writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"operation", "result"}) - writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed.", blobBytes, []string{"direction"}) - writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob operation latency.", blobDur, []string{"operation"}) + writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob backend operation latency.", blobDur, []string{"op", "backend", "result"}) + writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"op", "backend", "result"}) + writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed by operation.", blobBytes, []string{"op"}) + + writeGauge(&b, "fs_connection_pool_active", "Active pooled connections.", connectionActive) + writeGauge(&b, "fs_connection_pool_max", "Maximum pooled connections.", connectionMax) + writeCounter(&b, "fs_connection_pool_waits_total", "Number of waits due to pool saturation.", connectionWaits) + + writeGauge(&b, "fs_request_queue_length", "Requests waiting for an execution slot.", queueLength) + writeGauge(&b, "fs_worker_pool_active", "Active workers.", workerActive) + writeGauge(&b, "fs_worker_pool_size", "Configured worker pool size.", workerSize) + + writeHistogramVecKV(&b, "fs_lock_wait_seconds", "Time spent waiting for locks.", lockWait, []string{"lock_name"}) + writeHistogramVecKV(&b, "fs_lock_hold_seconds", "Time locks were held.", lockHold, []string{"lock_name"}) + + writeCounterVecKV(&b, "fs_cache_hits_total", "Cache hits by cache name.", cacheHits, []string{"cache"}) + writeCounterVecKV(&b, "fs_cache_misses_total", "Cache misses by cache name.", cacheMisses, []string{"cache"}) + + writeHistogram(&b, "fs_batch_size_histogram", "Observed batch sizes.", nil, batchBounds, batchCounts, batchSum, batchCount) + + writeCounterVecKV(&b, "fs_retries_total", "Retries by operation and reason.", retries, []string{"op", "reason"}) + writeCounterVecKV(&b, "fs_errors_total", "Errors by operation and reason.", errorsTotal, []string{"op", "reason"}) writeCounterVecKV(&b, "fs_gc_runs_total", "Garbage collection runs.", gcRuns, []string{"result"}) writeHistogram(&b, "fs_gc_duration_seconds", "Garbage collection runtime.", nil, gcDurBounds, gcDurCounts, gcDurSum, gcDurCount) @@ -286,15 +588,18 @@ func (r *Registry) RenderPrometheus() string { writeCounter(&b, "fs_runtime_gc_cycles_total", "Completed GC cycles.", uint64(mem.NumGC)) writeCounterFloat(&b, "fs_runtime_gc_pause_seconds_total", "Total GC pause time in seconds.", float64(mem.PauseTotalNs)/1e9) - return b.String() -} - -func normalizeRoute(route string) string { - route = strings.TrimSpace(route) - if route == "" { - return "/unknown" + if hasCPU { + writeCounterFloat(&b, "process_cpu_seconds_total", "Total user and system CPU time spent in seconds.", cpuSeconds) } - return route + if hasResident { + writeGauge(&b, "process_resident_memory_bytes", "Resident memory size in bytes.", resident) + } + if hasOpenFDs { + writeGauge(&b, "process_open_fds", "Number of open file descriptors.", openFDs) + writeGauge(&b, "fs_open_fds", "Number of open file descriptors.", openFDs) + } + + return b.String() } type histogramSnapshot struct { @@ -312,16 +617,19 @@ func copyCounterMap(src map[string]uint64) map[string]uint64 { return out } +func copyIntGaugeMap(src map[string]int64) map[string]int64 { + out := make(map[string]int64, len(src)) + for k, v := range src { + out[k] = v + } + return out +} + func copyHistogramMap(src map[string]*histogram) map[string]histogramSnapshot { out := make(map[string]histogramSnapshot, len(src)) for k, h := range src { bounds, counts, sum, count := h.snapshot() - out[k] = histogramSnapshot{ - bounds: bounds, - counts: counts, - sum: sum, - count: count, - } + out[k] = histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count} } return out } @@ -357,6 +665,19 @@ func writeGaugeVec(b *strings.Builder, name, help string, values map[string]floa } } +func writeGaugeVecFromInt64(b *strings.Builder, name, help string, values map[string]int64, labelName string) { + fmt.Fprintf(b, "# HELP %s %s\n", name, help) + fmt.Fprintf(b, "# TYPE %s gauge\n", name) + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), float64(values[key])) + } +} + func writeCounterVecKV(b *strings.Builder, name, help string, values map[string]uint64, labels []string) { fmt.Fprintf(b, "# HELP %s %s\n", name, help) fmt.Fprintf(b, "# TYPE %s counter\n", name) @@ -396,12 +717,7 @@ func writeHistogramVecKV(b *strings.Builder, name, help string, values map[strin func writeHistogram(b *strings.Builder, name, help string, labels map[string]string, bounds []float64, counts []uint64, sum float64, count uint64) { fmt.Fprintf(b, "# HELP %s %s\n", name, help) fmt.Fprintf(b, "# TYPE %s histogram\n", name) - writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{ - bounds: bounds, - counts: counts, - sum: sum, - count: count, - }) + writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count}) } func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[string]string, s histogramSnapshot) { @@ -464,8 +780,42 @@ func trimFloat(v float64) string { } func escapeLabelValue(value string) string { - value = strings.ReplaceAll(value, `\`, `\\`) - value = strings.ReplaceAll(value, "\n", `\n`) - value = strings.ReplaceAll(value, `"`, `\"`) + value = strings.ReplaceAll(value, `\\`, `\\\\`) + value = strings.ReplaceAll(value, "\n", `\\n`) + value = strings.ReplaceAll(value, `"`, `\\"`) return value } + +func readOpenFDs() (float64, bool) { + entries, err := os.ReadDir("/proc/self/fd") + if err != nil { + return 0, false + } + return float64(len(entries)), true +} + +func readResidentMemoryBytes() (float64, bool) { + data, err := os.ReadFile("/proc/self/statm") + if err != nil { + return 0, false + } + fields := strings.Fields(string(data)) + if len(fields) < 2 { + return 0, false + } + rssPages, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil || rssPages < 0 { + return 0, false + } + return float64(rssPages * int64(os.Getpagesize())), true +} + +func readProcessCPUSeconds() (float64, bool) { + var usage syscall.Rusage + if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil { + return 0, false + } + user := float64(usage.Utime.Sec) + float64(usage.Utime.Usec)/1e6 + sys := float64(usage.Stime.Sec) + float64(usage.Stime.Usec)/1e6 + return user + sys, true +} diff --git a/service/service.go b/service/service.go index 8442a43..f06dd66 100644 --- a/service/service.go +++ b/service/service.go @@ -42,6 +42,28 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st } } +func (s *ObjectService) acquireGCRLock() func() { + waitStart := time.Now() + s.gcMu.RLock() + metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart)) + holdStart := time.Now() + return func() { + metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart)) + s.gcMu.RUnlock() + } +} + +func (s *ObjectService) acquireGCLock() func() { + waitStart := time.Now() + s.gcMu.Lock() + metrics.Default.ObserveLockWait("gc_mu_write", time.Since(waitStart)) + holdStart := time.Now() + return func() { + metrics.Default.ObserveLockHold("gc_mu_write", time.Since(holdStart)) + s.gcMu.Unlock() + } +} + func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) { start := time.Now() success := false @@ -49,8 +71,8 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read metrics.Default.ObserveService("put_object", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() chunks, size, etag, err := s.blob.IngestStream(input) if err != nil { @@ -89,16 +111,21 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob metrics.Default.ObserveService("get_object", time.Since(start), success) }() + waitStart := time.Now() s.gcMu.RLock() + metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart)) + holdStart := time.Now() manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { + metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart)) s.gcMu.RUnlock() return nil, nil, err } pr, pw := io.Pipe() go func() { + defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart)) defer s.gcMu.RUnlock() if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil { _ = pw.CloseWithError(err) @@ -117,8 +144,8 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e metrics.Default.ObserveService("head_object", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() manifest, err := s.metadata.GetManifest(bucket, key) if err != nil { @@ -135,16 +162,16 @@ func (s *ObjectService) DeleteObject(bucket, key string) error { metrics.Default.ObserveService("delete_object", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() err := s.metadata.DeleteManifest(bucket, key) success = err == nil return err } func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() return s.metadata.ListObjects(bucket, prefix) } @@ -156,8 +183,8 @@ func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*mode metrics.Default.ObserveService("for_each_object_from", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() err := s.metadata.ForEachObjectFrom(bucket, startKey, fn) success = err == nil @@ -171,31 +198,31 @@ func (s *ObjectService) CreateBucket(bucket string) error { metrics.Default.ObserveService("create_bucket", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() err := s.metadata.CreateBucket(bucket) success = err == nil return err } func (s *ObjectService) HeadBucket(bucket string) error { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() _, err := s.metadata.GetBucketManifest(bucket) return err } func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() return s.metadata.GetBucketManifest(bucket) } func (s *ObjectService) DeleteBucket(bucket string) error { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() return s.metadata.DeleteBucket(bucket) } @@ -206,8 +233,8 @@ func (s *ObjectService) ListBuckets() ([]string, error) { metrics.Default.ObserveService("list_buckets", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() buckets, err := s.metadata.ListBuckets() success = err == nil @@ -215,14 +242,14 @@ func (s *ObjectService) ListBuckets() ([]string, error) { } func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() return s.metadata.DeleteManifests(bucket, keys) } func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() return s.metadata.CreateMultipartUpload(bucket, key) } @@ -233,8 +260,8 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, metrics.Default.ObserveService("upload_part", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() if partNumber < 1 || partNumber > 10000 { return "", ErrInvalidPart @@ -269,8 +296,8 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, } func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { @@ -289,8 +316,8 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co metrics.Default.ObserveService("complete_multipart_upload", time.Since(start), success) }() - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() if len(completed) == 0 { return nil, ErrInvalidCompleteRequest @@ -360,8 +387,8 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co } func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error { - s.gcMu.RLock() - defer s.gcMu.RUnlock() + unlock := s.acquireGCRLock() + defer unlock() upload, err := s.metadata.GetMultipartUpload(uploadID) if err != nil { @@ -404,8 +431,8 @@ func (s *ObjectService) GarbageCollect() error { metrics.Default.ObserveGC(time.Since(start), deletedChunks, deleteErrors, cleanedUploads, success) }() - s.gcMu.Lock() - defer s.gcMu.Unlock() + unlock := s.acquireGCLock() + defer unlock() referencedChunkSet, err := s.metadata.GetReferencedChunkSet() if err != nil { From 8c9cd962132d74175ac5f14f6a1a56511873cbcd Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Mon, 2 Mar 2026 22:30:15 +0100 Subject: [PATCH 3/4] Auth for metrics, removed unwanted metrics and fixed tests. --- api/api.go | 8 +++---- auth/middleware.go | 2 +- logging/logging.go | 2 -- metrics/metrics.go | 46 +++++++++-------------------------------- metrics/metrics_test.go | 26 +++++++++++++++++++++++ 5 files changed, 41 insertions(+), 43 deletions(-) create mode 100644 metrics/metrics_test.go diff --git a/api/api.go b/api/api.go index 61e81a8..250bac2 100644 --- a/api/api.go +++ b/api/api.go @@ -110,13 +110,14 @@ func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) { } func (h *Handler) handleMetrics(w http.ResponseWriter, r *http.Request) { - payload := metrics.Default.RenderPrometheus() w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") - w.Header().Set("Content-Length", strconv.Itoa(len(payload))) - w.WriteHeader(http.StatusOK) if r.Method == http.MethodHead { + w.WriteHeader(http.StatusOK) return } + payload := metrics.Default.RenderPrometheus() + w.Header().Set("Content-Length", strconv.Itoa(len(payload))) + w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(payload)) } @@ -645,7 +646,6 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener { return inner } metrics.Default.SetConnectionPoolMax(maxConns) - metrics.Default.SetWorkerPoolSize(maxConns) return &limitedListener{ Listener: inner, slots: make(chan struct{}, maxConns), diff --git a/auth/middleware.go b/auth/middleware.go index 63cebbd..24f656e 100644 --- a/auth/middleware.go +++ b/auth/middleware.go @@ -25,7 +25,7 @@ func Middleware( return } - if r.URL.Path == "/healthz" || r.URL.Path == "/metrics" { + if r.URL.Path == "/healthz" { metrics.Default.ObserveAuth("bypass", "none", "public_endpoint") next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx))) return diff --git a/logging/logging.go b/logging/logging.go index 2749695..3600da7 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -90,10 +90,8 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) op := metricOperationLabel(r) metrics.Default.IncHTTPInFlightOp(op) - metrics.Default.IncWorkerPoolActive() defer func() { metrics.Default.DecHTTPInFlightOp(op) - metrics.Default.DecWorkerPoolActive() }() requestID := middleware.GetReqID(r.Context()) if requestID != "" { diff --git a/metrics/metrics.go b/metrics/metrics.go index 307f8aa..8f11b39 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -75,8 +75,6 @@ type Registry struct { connectionPoolWaits atomic.Uint64 requestQueueLength atomic.Int64 - workerPoolActive atomic.Int64 - workerPoolSize atomic.Int64 mu sync.Mutex @@ -364,21 +362,6 @@ func (r *Registry) DecRequestQueueLength() { r.requestQueueLength.Add(-1) } -func (r *Registry) SetWorkerPoolSize(size int) { - if size < 0 { - size = 0 - } - r.workerPoolSize.Store(int64(size)) -} - -func (r *Registry) IncWorkerPoolActive() { - r.workerPoolActive.Add(1) -} - -func (r *Registry) DecWorkerPoolActive() { - r.workerPoolActive.Add(-1) -} - func (r *Registry) ObserveLockWait(lockName string, d time.Duration) { lockName = strings.TrimSpace(lockName) if lockName == "" { @@ -520,10 +503,7 @@ func (r *Registry) RenderPrometheus() string { connectionMax := float64(r.connectionPoolMax.Load()) connectionWaits := r.connectionPoolWaits.Load() queueLength := float64(r.requestQueueLength.Load()) - workerActive := float64(r.workerPoolActive.Load()) - workerSize := float64(r.workerPoolSize.Load()) - openFDs, hasOpenFDs := readOpenFDs() resident, hasResident := readResidentMemoryBytes() cpuSeconds, hasCPU := readProcessCPUSeconds() @@ -555,8 +535,6 @@ func (r *Registry) RenderPrometheus() string { writeCounter(&b, "fs_connection_pool_waits_total", "Number of waits due to pool saturation.", connectionWaits) writeGauge(&b, "fs_request_queue_length", "Requests waiting for an execution slot.", queueLength) - writeGauge(&b, "fs_worker_pool_active", "Active workers.", workerActive) - writeGauge(&b, "fs_worker_pool_size", "Configured worker pool size.", workerSize) writeHistogramVecKV(&b, "fs_lock_wait_seconds", "Time spent waiting for locks.", lockWait, []string{"lock_name"}) writeHistogramVecKV(&b, "fs_lock_hold_seconds", "Time locks were held.", lockHold, []string{"lock_name"}) @@ -594,10 +572,6 @@ func (r *Registry) RenderPrometheus() string { if hasResident { writeGauge(&b, "process_resident_memory_bytes", "Resident memory size in bytes.", resident) } - if hasOpenFDs { - writeGauge(&b, "process_open_fds", "Number of open file descriptors.", openFDs) - writeGauge(&b, "fs_open_fds", "Number of open file descriptors.", openFDs) - } return b.String() } @@ -732,8 +706,16 @@ func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[str } fmt.Fprintf(b, "%s_bucket{%s} %d\n", name, labelsToString(bucketLabels), cumulative) } - fmt.Fprintf(b, "%s_sum{%s} %.9f\n", name, labelsToString(labels), s.sum) - fmt.Fprintf(b, "%s_count{%s} %d\n", name, labelsToString(labels), s.count) + labelsSuffix := formatLabelsSuffix(labels) + fmt.Fprintf(b, "%s_sum%s %.9f\n", name, labelsSuffix, s.sum) + fmt.Fprintf(b, "%s_count%s %d\n", name, labelsSuffix, s.count) +} + +func formatLabelsSuffix(labels map[string]string) string { + if len(labels) == 0 { + return "" + } + return "{" + labelsToString(labels) + "}" } func formatLabels(keys, values []string) string { @@ -786,14 +768,6 @@ func escapeLabelValue(value string) string { return value } -func readOpenFDs() (float64, bool) { - entries, err := os.ReadDir("/proc/self/fd") - if err != nil { - return 0, false - } - return float64(len(entries)), true -} - func readResidentMemoryBytes() (float64, bool) { data, err := os.ReadFile("/proc/self/statm") if err != nil { diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go new file mode 100644 index 0000000..2fffc4f --- /dev/null +++ b/metrics/metrics_test.go @@ -0,0 +1,26 @@ +package metrics + +import ( + "strings" + "testing" +) + +func TestRenderPrometheusHistogramNoEmptyLabelSet(t *testing.T) { + reg := NewRegistry() + reg.ObserveBatchSize(3) + reg.ObserveGC(0, 0, 0, 0, true) + + out := reg.RenderPrometheus() + if strings.Contains(out, "fs_batch_size_histogram_sum{}") { + t.Fatalf("unexpected empty label set for batch sum metric") + } + if strings.Contains(out, "fs_batch_size_histogram_count{}") { + t.Fatalf("unexpected empty label set for batch count metric") + } + if strings.Contains(out, "fs_gc_duration_seconds_sum{}") { + t.Fatalf("unexpected empty label set for gc sum metric") + } + if strings.Contains(out, "fs_gc_duration_seconds_count{}") { + t.Fatalf("unexpected empty label set for gc count metric") + } +} From c03bd3e3a2e3a167ed4ffbcac2b268c545c96065 Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Mon, 2 Mar 2026 23:26:57 +0100 Subject: [PATCH 4/4] Minimal fixes for metrics --- api/api.go | 3 +-- metrics/metrics.go | 2 +- metrics/metrics_test.go | 8 ++++++++ service/service.go | 15 +++++++++------ storage/blob.go | 4 +++- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/api/api.go b/api/api.go index 250bac2..69ccfa9 100644 --- a/api/api.go +++ b/api/api.go @@ -661,13 +661,12 @@ func (l *limitedListener) Accept() (net.Conn, error) { l.slots <- struct{}{} metrics.Default.DecRequestQueueLength() } - metrics.Default.IncConnectionPoolActive() conn, err := l.Listener.Accept() if err != nil { <-l.slots - metrics.Default.DecConnectionPoolActive() return nil, err } + metrics.Default.IncConnectionPoolActive() return &limitedConn{ Conn: conn, done: func() { diff --git a/metrics/metrics.go b/metrics/metrics.go index 8f11b39..45115df 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -762,7 +762,7 @@ func trimFloat(v float64) string { } func escapeLabelValue(value string) string { - value = strings.ReplaceAll(value, `\\`, `\\\\`) + value = strings.ReplaceAll(value, `\`, `\\`) value = strings.ReplaceAll(value, "\n", `\\n`) value = strings.ReplaceAll(value, `"`, `\\"`) return value diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 2fffc4f..2f25c23 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -24,3 +24,11 @@ func TestRenderPrometheusHistogramNoEmptyLabelSet(t *testing.T) { t.Fatalf("unexpected empty label set for gc count metric") } } + +func TestEscapeLabelValueEscapesSingleBackslash(t *testing.T) { + got := escapeLabelValue(`a\b`) + want := `a\\b` + if got != want { + t.Fatalf("escapeLabelValue returned %q, want %q", got, want) + } +} diff --git a/service/service.go b/service/service.go index f06dd66..9d7db43 100644 --- a/service/service.go +++ b/service/service.go @@ -106,10 +106,6 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) { start := time.Now() - success := false - defer func() { - metrics.Default.ObserveService("get_object", time.Since(start), success) - }() waitStart := time.Now() s.gcMu.RLock() @@ -120,20 +116,27 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob if err != nil { metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart)) s.gcMu.RUnlock() + metrics.Default.ObserveService("get_object", time.Since(start), false) return nil, nil, err } pr, pw := io.Pipe() go func() { + streamOK := false + defer func() { + metrics.Default.ObserveService("get_object", time.Since(start), streamOK) + }() defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart)) defer s.gcMu.RUnlock() if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil { _ = pw.CloseWithError(err) return } - _ = pw.Close() + if err := pw.Close(); err != nil { + return + } + streamOK = true }() - success = true return pr, manifest, nil } diff --git a/storage/blob.go b/storage/blob.go index 41297df..667958f 100644 --- a/storage/blob.go +++ b/storage/blob.go @@ -88,8 +88,9 @@ func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, er func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { start := time.Now() success := false + writtenBytes := int64(0) defer func() { - metrics.Default.ObserveBlob("write_chunk", time.Since(start), int64(len(data)), success) + metrics.Default.ObserveBlob("write_chunk", time.Since(start), writtenBytes, success) }() if !isValidChunkID(chunkID) { @@ -144,6 +145,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error { if err := syncDir(dir); err != nil { return err } + writtenBytes = int64(len(data)) success = true return nil }