mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-04 20:36:25 +00:00
Updated metrics
This commit is contained in:
21
api/api.go
21
api/api.go
@@ -236,6 +236,7 @@ func (h *Handler) handlePostObject(w http.ResponseWriter, r *http.Request) {
|
||||
writeS3Error(w, r, s3ErrMalformedXML, r.URL.Path)
|
||||
return
|
||||
}
|
||||
metrics.Default.ObserveBatchSize(len(req.Parts))
|
||||
|
||||
manifest, err := h.svc.CompleteMultipartUpload(bucket, key, uploadID, req.Parts)
|
||||
if err != nil {
|
||||
@@ -311,6 +312,7 @@ func (h *Handler) handlePutObject(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
metrics.Default.ObserveBatchSize(1)
|
||||
|
||||
if ifNoneMatch := strings.TrimSpace(r.Header.Get("If-None-Match")); ifNoneMatch != "" {
|
||||
manifest, err := h.svc.HeadObject(bucket, key)
|
||||
@@ -523,6 +525,7 @@ func (h *Handler) handlePostBucket(w http.ResponseWriter, r *http.Request) {
|
||||
writeS3Error(w, r, s3ErrTooManyDeleteObjects, r.URL.Path)
|
||||
return
|
||||
}
|
||||
metrics.Default.ObserveBatchSize(len(req.Objects))
|
||||
|
||||
keys := make([]string, 0, len(req.Objects))
|
||||
response := models.DeleteObjectsResult{
|
||||
@@ -641,6 +644,8 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener {
|
||||
if maxConns <= 0 {
|
||||
return inner
|
||||
}
|
||||
metrics.Default.SetConnectionPoolMax(maxConns)
|
||||
metrics.Default.SetWorkerPoolSize(maxConns)
|
||||
return &limitedListener{
|
||||
Listener: inner,
|
||||
slots: make(chan struct{}, maxConns),
|
||||
@@ -648,15 +653,27 @@ func newLimitedListener(inner net.Listener, maxConns int) net.Listener {
|
||||
}
|
||||
|
||||
func (l *limitedListener) Accept() (net.Conn, error) {
|
||||
l.slots <- struct{}{}
|
||||
select {
|
||||
case l.slots <- struct{}{}:
|
||||
default:
|
||||
metrics.Default.IncConnectionPoolWait()
|
||||
metrics.Default.IncRequestQueueLength()
|
||||
l.slots <- struct{}{}
|
||||
metrics.Default.DecRequestQueueLength()
|
||||
}
|
||||
metrics.Default.IncConnectionPoolActive()
|
||||
conn, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
<-l.slots
|
||||
metrics.Default.DecConnectionPoolActive()
|
||||
return nil, err
|
||||
}
|
||||
return &limitedConn{
|
||||
Conn: conn,
|
||||
done: func() { <-l.slots },
|
||||
done: func() {
|
||||
<-l.slots
|
||||
metrics.Default.DecConnectionPoolActive()
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fs/auth"
|
||||
"fs/metadata"
|
||||
"fs/metrics"
|
||||
"fs/models"
|
||||
"fs/service"
|
||||
"net/http"
|
||||
@@ -200,12 +201,19 @@ func mapToS3Error(err error) s3APIError {
|
||||
|
||||
func writeS3Error(w http.ResponseWriter, r *http.Request, apiErr s3APIError, resource string) {
|
||||
requestID := ""
|
||||
op := "other"
|
||||
if r != nil {
|
||||
requestID = middleware.GetReqID(r.Context())
|
||||
isDeletePost := false
|
||||
if r.Method == http.MethodPost {
|
||||
_, isDeletePost = r.URL.Query()["delete"]
|
||||
}
|
||||
op = metrics.NormalizeHTTPOperation(r.Method, isDeletePost)
|
||||
if requestID != "" {
|
||||
w.Header().Set("x-amz-request-id", requestID)
|
||||
}
|
||||
}
|
||||
metrics.Default.ObserveError(op, apiErr.Code)
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
w.WriteHeader(apiErr.Status)
|
||||
|
||||
|
||||
@@ -88,8 +88,13 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
|
||||
metrics.Default.IncHTTPInFlight()
|
||||
defer metrics.Default.DecHTTPInFlight()
|
||||
op := metricOperationLabel(r)
|
||||
metrics.Default.IncHTTPInFlightOp(op)
|
||||
metrics.Default.IncWorkerPoolActive()
|
||||
defer func() {
|
||||
metrics.Default.DecHTTPInFlightOp(op)
|
||||
metrics.Default.DecWorkerPoolActive()
|
||||
}()
|
||||
requestID := middleware.GetReqID(r.Context())
|
||||
if requestID != "" {
|
||||
ww.Header().Set("x-amz-request-id", requestID)
|
||||
@@ -103,7 +108,7 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
|
||||
status = http.StatusOK
|
||||
}
|
||||
route := metricRouteLabel(r)
|
||||
metrics.Default.ObserveHTTPRequest(r.Method, route, status, elapsed, ww.BytesWritten())
|
||||
metrics.Default.ObserveHTTPRequestDetailed(r.Method, route, op, status, elapsed, ww.BytesWritten())
|
||||
|
||||
if !cfg.Audit && !cfg.DebugMode {
|
||||
return
|
||||
@@ -167,6 +172,17 @@ func metricRouteLabel(r *http.Request) string {
|
||||
return "/{bucket}/*"
|
||||
}
|
||||
|
||||
func metricOperationLabel(r *http.Request) string {
|
||||
if r == nil {
|
||||
return "other"
|
||||
}
|
||||
isDeletePost := false
|
||||
if r.Method == http.MethodPost && r.URL != nil {
|
||||
_, isDeletePost = r.URL.Query()["delete"]
|
||||
}
|
||||
return metrics.NormalizeHTTPOperation(r.Method, isDeletePost)
|
||||
}
|
||||
|
||||
func envBool(key string, defaultValue bool) bool {
|
||||
raw := os.Getenv(key)
|
||||
if raw == "" {
|
||||
|
||||
@@ -2,12 +2,14 @@ package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -17,6 +19,14 @@ var defaultBuckets = []float64{
|
||||
1, 2.5, 5, 10,
|
||||
}
|
||||
|
||||
var lockBuckets = []float64{
|
||||
0.000001, 0.000005, 0.00001, 0.00005,
|
||||
0.0001, 0.0005, 0.001, 0.005, 0.01,
|
||||
0.025, 0.05, 0.1, 0.25, 0.5, 1,
|
||||
}
|
||||
|
||||
var batchBuckets = []float64{1, 2, 4, 8, 16, 32, 64, 100, 128, 256, 512, 1000, 5000}
|
||||
|
||||
var Default = NewRegistry()
|
||||
|
||||
type histogram struct {
|
||||
@@ -57,13 +67,26 @@ func (h *histogram) snapshot() (bounds []float64, counts []uint64, sum float64,
|
||||
|
||||
type Registry struct {
|
||||
startedAt time.Time
|
||||
inFlight atomic.Int64
|
||||
|
||||
httpInFlight atomic.Int64
|
||||
|
||||
connectionPoolActive atomic.Int64
|
||||
connectionPoolMax atomic.Int64
|
||||
connectionPoolWaits atomic.Uint64
|
||||
|
||||
requestQueueLength atomic.Int64
|
||||
workerPoolActive atomic.Int64
|
||||
workerPoolSize atomic.Int64
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
httpRequests map[string]uint64
|
||||
httpResponseByte map[string]uint64
|
||||
httpDuration map[string]*histogram
|
||||
httpRequestsRoute map[string]uint64
|
||||
httpResponseBytesRoute map[string]uint64
|
||||
httpDurationRoute map[string]*histogram
|
||||
|
||||
httpRequestsOp map[string]uint64
|
||||
httpDurationOp map[string]*histogram
|
||||
httpInFlightOp map[string]int64
|
||||
|
||||
authRequests map[string]uint64
|
||||
|
||||
@@ -77,6 +100,17 @@ type Registry struct {
|
||||
blobBytes map[string]uint64
|
||||
blobDuration map[string]*histogram
|
||||
|
||||
lockWait map[string]*histogram
|
||||
lockHold map[string]*histogram
|
||||
|
||||
cacheHits map[string]uint64
|
||||
cacheMisses map[string]uint64
|
||||
|
||||
batchSize *histogram
|
||||
|
||||
retries map[string]uint64
|
||||
errors map[string]uint64
|
||||
|
||||
gcRuns map[string]uint64
|
||||
gcDuration *histogram
|
||||
gcDeletedChunks uint64
|
||||
@@ -86,47 +120,136 @@ type Registry struct {
|
||||
|
||||
func NewRegistry() *Registry {
|
||||
return &Registry{
|
||||
startedAt: time.Now(),
|
||||
httpRequests: make(map[string]uint64),
|
||||
httpResponseByte: make(map[string]uint64),
|
||||
httpDuration: make(map[string]*histogram),
|
||||
authRequests: make(map[string]uint64),
|
||||
serviceOps: make(map[string]uint64),
|
||||
serviceDuration: make(map[string]*histogram),
|
||||
dbTxTotal: make(map[string]uint64),
|
||||
dbTxDuration: make(map[string]*histogram),
|
||||
blobOps: make(map[string]uint64),
|
||||
blobBytes: make(map[string]uint64),
|
||||
blobDuration: make(map[string]*histogram),
|
||||
gcRuns: make(map[string]uint64),
|
||||
gcDuration: newHistogram(defaultBuckets),
|
||||
startedAt: time.Now(),
|
||||
httpRequestsRoute: make(map[string]uint64),
|
||||
httpResponseBytesRoute: make(map[string]uint64),
|
||||
httpDurationRoute: make(map[string]*histogram),
|
||||
httpRequestsOp: make(map[string]uint64),
|
||||
httpDurationOp: make(map[string]*histogram),
|
||||
httpInFlightOp: make(map[string]int64),
|
||||
authRequests: make(map[string]uint64),
|
||||
serviceOps: make(map[string]uint64),
|
||||
serviceDuration: make(map[string]*histogram),
|
||||
dbTxTotal: make(map[string]uint64),
|
||||
dbTxDuration: make(map[string]*histogram),
|
||||
blobOps: make(map[string]uint64),
|
||||
blobBytes: make(map[string]uint64),
|
||||
blobDuration: make(map[string]*histogram),
|
||||
lockWait: make(map[string]*histogram),
|
||||
lockHold: make(map[string]*histogram),
|
||||
cacheHits: make(map[string]uint64),
|
||||
cacheMisses: make(map[string]uint64),
|
||||
batchSize: newHistogram(batchBuckets),
|
||||
retries: make(map[string]uint64),
|
||||
errors: make(map[string]uint64),
|
||||
gcRuns: make(map[string]uint64),
|
||||
gcDuration: newHistogram(defaultBuckets),
|
||||
}
|
||||
}
|
||||
|
||||
func NormalizeHTTPOperation(method string, isDeletePost bool) string {
|
||||
switch strings.ToUpper(strings.TrimSpace(method)) {
|
||||
case "GET":
|
||||
return "get"
|
||||
case "PUT":
|
||||
return "put"
|
||||
case "DELETE":
|
||||
return "delete"
|
||||
case "HEAD":
|
||||
return "head"
|
||||
case "POST":
|
||||
if isDeletePost {
|
||||
return "delete"
|
||||
}
|
||||
return "put"
|
||||
default:
|
||||
return "other"
|
||||
}
|
||||
}
|
||||
|
||||
func statusResult(status int) string {
|
||||
if status >= 200 && status < 400 {
|
||||
return "ok"
|
||||
}
|
||||
return "error"
|
||||
}
|
||||
|
||||
func normalizeRoute(route string) string {
|
||||
route = strings.TrimSpace(route)
|
||||
if route == "" {
|
||||
return "/unknown"
|
||||
}
|
||||
return route
|
||||
}
|
||||
|
||||
func normalizeOp(op string) string {
|
||||
op = strings.ToLower(strings.TrimSpace(op))
|
||||
if op == "" {
|
||||
return "other"
|
||||
}
|
||||
return op
|
||||
}
|
||||
|
||||
func (r *Registry) IncHTTPInFlight() {
|
||||
r.inFlight.Add(1)
|
||||
r.httpInFlight.Add(1)
|
||||
}
|
||||
|
||||
func (r *Registry) DecHTTPInFlight() {
|
||||
r.inFlight.Add(-1)
|
||||
r.httpInFlight.Add(-1)
|
||||
}
|
||||
|
||||
func (r *Registry) IncHTTPInFlightOp(op string) {
|
||||
r.httpInFlight.Add(1)
|
||||
op = normalizeOp(op)
|
||||
r.mu.Lock()
|
||||
r.httpInFlightOp[op]++
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) DecHTTPInFlightOp(op string) {
|
||||
r.httpInFlight.Add(-1)
|
||||
op = normalizeOp(op)
|
||||
r.mu.Lock()
|
||||
r.httpInFlightOp[op]--
|
||||
if r.httpInFlightOp[op] < 0 {
|
||||
r.httpInFlightOp[op] = 0
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveHTTPRequest(method, route string, status int, d time.Duration, responseBytes int) {
|
||||
op := NormalizeHTTPOperation(method, false)
|
||||
r.ObserveHTTPRequestDetailed(method, route, op, status, d, responseBytes)
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveHTTPRequestDetailed(method, route, op string, status int, d time.Duration, responseBytes int) {
|
||||
route = normalizeRoute(route)
|
||||
key := method + "|" + route + "|" + strconv.Itoa(status)
|
||||
durationKey := method + "|" + route
|
||||
op = normalizeOp(op)
|
||||
result := statusResult(status)
|
||||
|
||||
routeKey := method + "|" + route + "|" + strconv.Itoa(status)
|
||||
routeDurKey := method + "|" + route
|
||||
opKey := op + "|" + result
|
||||
|
||||
r.mu.Lock()
|
||||
r.httpRequests[key]++
|
||||
r.httpRequestsRoute[routeKey]++
|
||||
if responseBytes > 0 {
|
||||
r.httpResponseByte[key] += uint64(responseBytes)
|
||||
r.httpResponseBytesRoute[routeKey] += uint64(responseBytes)
|
||||
}
|
||||
h := r.httpDuration[durationKey]
|
||||
if h == nil {
|
||||
h = newHistogram(defaultBuckets)
|
||||
r.httpDuration[durationKey] = h
|
||||
hRoute := r.httpDurationRoute[routeDurKey]
|
||||
if hRoute == nil {
|
||||
hRoute = newHistogram(defaultBuckets)
|
||||
r.httpDurationRoute[routeDurKey] = hRoute
|
||||
}
|
||||
h.observe(d.Seconds())
|
||||
hRoute.observe(d.Seconds())
|
||||
|
||||
r.httpRequestsOp[opKey]++
|
||||
hOp := r.httpDurationOp[opKey]
|
||||
if hOp == nil {
|
||||
hOp = newHistogram(defaultBuckets)
|
||||
r.httpDurationOp[opKey] = hOp
|
||||
}
|
||||
hOp.observe(d.Seconds())
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -179,31 +302,166 @@ func (r *Registry) ObserveMetadataTx(txType string, d time.Duration, ok bool) {
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool) {
|
||||
func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool, backend ...string) {
|
||||
be := "disk"
|
||||
if len(backend) > 0 {
|
||||
candidate := strings.TrimSpace(backend[0])
|
||||
if candidate != "" {
|
||||
be = strings.ToLower(candidate)
|
||||
}
|
||||
}
|
||||
result := "error"
|
||||
if ok {
|
||||
result = "ok"
|
||||
}
|
||||
key := operation + "|" + result
|
||||
op := strings.ToLower(strings.TrimSpace(operation))
|
||||
if op == "" {
|
||||
op = "unknown"
|
||||
}
|
||||
|
||||
histKey := op + "|" + be + "|" + result
|
||||
opsKey := histKey
|
||||
|
||||
r.mu.Lock()
|
||||
r.blobOps[key]++
|
||||
h := r.blobDuration[operation]
|
||||
r.blobOps[opsKey]++
|
||||
h := r.blobDuration[histKey]
|
||||
if h == nil {
|
||||
h = newHistogram(defaultBuckets)
|
||||
r.blobDuration[operation] = h
|
||||
r.blobDuration[histKey] = h
|
||||
}
|
||||
h.observe(d.Seconds())
|
||||
|
||||
if bytes > 0 {
|
||||
switch operation {
|
||||
case "read_chunk":
|
||||
r.blobBytes["read"] += uint64(bytes)
|
||||
case "write_chunk":
|
||||
r.blobBytes["write"] += uint64(bytes)
|
||||
}
|
||||
r.blobBytes[op] += uint64(bytes)
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) SetConnectionPoolMax(max int) {
|
||||
if max < 0 {
|
||||
max = 0
|
||||
}
|
||||
r.connectionPoolMax.Store(int64(max))
|
||||
}
|
||||
|
||||
func (r *Registry) IncConnectionPoolActive() {
|
||||
r.connectionPoolActive.Add(1)
|
||||
}
|
||||
|
||||
func (r *Registry) DecConnectionPoolActive() {
|
||||
r.connectionPoolActive.Add(-1)
|
||||
}
|
||||
|
||||
func (r *Registry) IncConnectionPoolWait() {
|
||||
r.connectionPoolWaits.Add(1)
|
||||
}
|
||||
|
||||
func (r *Registry) IncRequestQueueLength() {
|
||||
r.requestQueueLength.Add(1)
|
||||
}
|
||||
|
||||
func (r *Registry) DecRequestQueueLength() {
|
||||
r.requestQueueLength.Add(-1)
|
||||
}
|
||||
|
||||
func (r *Registry) SetWorkerPoolSize(size int) {
|
||||
if size < 0 {
|
||||
size = 0
|
||||
}
|
||||
r.workerPoolSize.Store(int64(size))
|
||||
}
|
||||
|
||||
func (r *Registry) IncWorkerPoolActive() {
|
||||
r.workerPoolActive.Add(1)
|
||||
}
|
||||
|
||||
func (r *Registry) DecWorkerPoolActive() {
|
||||
r.workerPoolActive.Add(-1)
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveLockWait(lockName string, d time.Duration) {
|
||||
lockName = strings.TrimSpace(lockName)
|
||||
if lockName == "" {
|
||||
lockName = "unknown"
|
||||
}
|
||||
r.mu.Lock()
|
||||
h := r.lockWait[lockName]
|
||||
if h == nil {
|
||||
h = newHistogram(lockBuckets)
|
||||
r.lockWait[lockName] = h
|
||||
}
|
||||
h.observe(d.Seconds())
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveLockHold(lockName string, d time.Duration) {
|
||||
lockName = strings.TrimSpace(lockName)
|
||||
if lockName == "" {
|
||||
lockName = "unknown"
|
||||
}
|
||||
r.mu.Lock()
|
||||
h := r.lockHold[lockName]
|
||||
if h == nil {
|
||||
h = newHistogram(lockBuckets)
|
||||
r.lockHold[lockName] = h
|
||||
}
|
||||
h.observe(d.Seconds())
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveCacheHit(cache string) {
|
||||
cache = strings.TrimSpace(cache)
|
||||
if cache == "" {
|
||||
cache = "unknown"
|
||||
}
|
||||
r.mu.Lock()
|
||||
r.cacheHits[cache]++
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveCacheMiss(cache string) {
|
||||
cache = strings.TrimSpace(cache)
|
||||
if cache == "" {
|
||||
cache = "unknown"
|
||||
}
|
||||
r.mu.Lock()
|
||||
r.cacheMisses[cache]++
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveBatchSize(size int) {
|
||||
if size < 0 {
|
||||
size = 0
|
||||
}
|
||||
r.mu.Lock()
|
||||
r.batchSize.observe(float64(size))
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveRetry(op, reason string) {
|
||||
op = normalizeOp(op)
|
||||
reason = strings.TrimSpace(reason)
|
||||
if reason == "" {
|
||||
reason = "unknown"
|
||||
}
|
||||
key := op + "|" + reason
|
||||
r.mu.Lock()
|
||||
r.retries[key]++
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveError(op, reason string) {
|
||||
op = normalizeOp(op)
|
||||
reason = strings.TrimSpace(reason)
|
||||
if reason == "" {
|
||||
reason = "unknown"
|
||||
}
|
||||
key := op + "|" + reason
|
||||
r.mu.Lock()
|
||||
r.errors[key]++
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *Registry) ObserveGC(d time.Duration, deletedChunks, deleteErrors, cleanedUploads int, ok bool) {
|
||||
result := "error"
|
||||
if ok {
|
||||
@@ -230,9 +488,12 @@ func (r *Registry) RenderPrometheus() string {
|
||||
runtime.ReadMemStats(&mem)
|
||||
|
||||
r.mu.Lock()
|
||||
httpReq := copyCounterMap(r.httpRequests)
|
||||
httpBytes := copyCounterMap(r.httpResponseByte)
|
||||
httpDur := copyHistogramMap(r.httpDuration)
|
||||
httpReqRoute := copyCounterMap(r.httpRequestsRoute)
|
||||
httpRespRoute := copyCounterMap(r.httpResponseBytesRoute)
|
||||
httpDurRoute := copyHistogramMap(r.httpDurationRoute)
|
||||
httpReqOp := copyCounterMap(r.httpRequestsOp)
|
||||
httpDurOp := copyHistogramMap(r.httpDurationOp)
|
||||
httpInFlightOp := copyIntGaugeMap(r.httpInFlightOp)
|
||||
authReq := copyCounterMap(r.authRequests)
|
||||
serviceOps := copyCounterMap(r.serviceOps)
|
||||
serviceDur := copyHistogramMap(r.serviceDuration)
|
||||
@@ -241,6 +502,13 @@ func (r *Registry) RenderPrometheus() string {
|
||||
blobOps := copyCounterMap(r.blobOps)
|
||||
blobBytes := copyCounterMap(r.blobBytes)
|
||||
blobDur := copyHistogramMap(r.blobDuration)
|
||||
lockWait := copyHistogramMap(r.lockWait)
|
||||
lockHold := copyHistogramMap(r.lockHold)
|
||||
cacheHits := copyCounterMap(r.cacheHits)
|
||||
cacheMisses := copyCounterMap(r.cacheMisses)
|
||||
batchBounds, batchCounts, batchSum, batchCount := r.batchSize.snapshot()
|
||||
retries := copyCounterMap(r.retries)
|
||||
errorsTotal := copyCounterMap(r.errors)
|
||||
gcRuns := copyCounterMap(r.gcRuns)
|
||||
gcDurBounds, gcDurCounts, gcDurSum, gcDurCount := r.gcDuration.snapshot()
|
||||
gcDeletedChunks := r.gcDeletedChunks
|
||||
@@ -248,12 +516,27 @@ func (r *Registry) RenderPrometheus() string {
|
||||
gcCleanedUploads := r.gcCleanedUpload
|
||||
r.mu.Unlock()
|
||||
|
||||
connectionActive := float64(r.connectionPoolActive.Load())
|
||||
connectionMax := float64(r.connectionPoolMax.Load())
|
||||
connectionWaits := r.connectionPoolWaits.Load()
|
||||
queueLength := float64(r.requestQueueLength.Load())
|
||||
workerActive := float64(r.workerPoolActive.Load())
|
||||
workerSize := float64(r.workerPoolSize.Load())
|
||||
|
||||
openFDs, hasOpenFDs := readOpenFDs()
|
||||
resident, hasResident := readResidentMemoryBytes()
|
||||
cpuSeconds, hasCPU := readProcessCPUSeconds()
|
||||
|
||||
var b strings.Builder
|
||||
|
||||
writeGauge(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests.", float64(r.inFlight.Load()))
|
||||
writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests handled.", httpReq, []string{"method", "route", "status"})
|
||||
writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpBytes, []string{"method", "route", "status"})
|
||||
writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency.", httpDur, []string{"method", "route"})
|
||||
httpInFlightOp["all"] = r.httpInFlight.Load()
|
||||
writeGaugeVecFromInt64(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests by operation.", httpInFlightOp, "op")
|
||||
writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests by operation and result.", httpReqOp, []string{"op", "result"})
|
||||
writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency by operation and result.", httpDurOp, []string{"op", "result"})
|
||||
|
||||
writeCounterVecKV(&b, "fs_http_requests_by_route_total", "Total HTTP requests by method/route/status.", httpReqRoute, []string{"method", "route", "status"})
|
||||
writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpRespRoute, []string{"method", "route", "status"})
|
||||
writeHistogramVecKV(&b, "fs_http_request_duration_by_route_seconds", "HTTP request latency by method/route.", httpDurRoute, []string{"method", "route"})
|
||||
|
||||
writeCounterVecKV(&b, "fs_auth_requests_total", "Authentication attempts by result.", authReq, []string{"result", "auth_type", "reason"})
|
||||
|
||||
@@ -263,9 +546,28 @@ func (r *Registry) RenderPrometheus() string {
|
||||
writeCounterVecKV(&b, "fs_metadata_tx_total", "Metadata transaction calls.", dbTx, []string{"type", "result"})
|
||||
writeHistogramVecKV(&b, "fs_metadata_tx_duration_seconds", "Metadata transaction latency.", dbTxDur, []string{"type"})
|
||||
|
||||
writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"operation", "result"})
|
||||
writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed.", blobBytes, []string{"direction"})
|
||||
writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob operation latency.", blobDur, []string{"operation"})
|
||||
writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob backend operation latency.", blobDur, []string{"op", "backend", "result"})
|
||||
writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"op", "backend", "result"})
|
||||
writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed by operation.", blobBytes, []string{"op"})
|
||||
|
||||
writeGauge(&b, "fs_connection_pool_active", "Active pooled connections.", connectionActive)
|
||||
writeGauge(&b, "fs_connection_pool_max", "Maximum pooled connections.", connectionMax)
|
||||
writeCounter(&b, "fs_connection_pool_waits_total", "Number of waits due to pool saturation.", connectionWaits)
|
||||
|
||||
writeGauge(&b, "fs_request_queue_length", "Requests waiting for an execution slot.", queueLength)
|
||||
writeGauge(&b, "fs_worker_pool_active", "Active workers.", workerActive)
|
||||
writeGauge(&b, "fs_worker_pool_size", "Configured worker pool size.", workerSize)
|
||||
|
||||
writeHistogramVecKV(&b, "fs_lock_wait_seconds", "Time spent waiting for locks.", lockWait, []string{"lock_name"})
|
||||
writeHistogramVecKV(&b, "fs_lock_hold_seconds", "Time locks were held.", lockHold, []string{"lock_name"})
|
||||
|
||||
writeCounterVecKV(&b, "fs_cache_hits_total", "Cache hits by cache name.", cacheHits, []string{"cache"})
|
||||
writeCounterVecKV(&b, "fs_cache_misses_total", "Cache misses by cache name.", cacheMisses, []string{"cache"})
|
||||
|
||||
writeHistogram(&b, "fs_batch_size_histogram", "Observed batch sizes.", nil, batchBounds, batchCounts, batchSum, batchCount)
|
||||
|
||||
writeCounterVecKV(&b, "fs_retries_total", "Retries by operation and reason.", retries, []string{"op", "reason"})
|
||||
writeCounterVecKV(&b, "fs_errors_total", "Errors by operation and reason.", errorsTotal, []string{"op", "reason"})
|
||||
|
||||
writeCounterVecKV(&b, "fs_gc_runs_total", "Garbage collection runs.", gcRuns, []string{"result"})
|
||||
writeHistogram(&b, "fs_gc_duration_seconds", "Garbage collection runtime.", nil, gcDurBounds, gcDurCounts, gcDurSum, gcDurCount)
|
||||
@@ -286,15 +588,18 @@ func (r *Registry) RenderPrometheus() string {
|
||||
writeCounter(&b, "fs_runtime_gc_cycles_total", "Completed GC cycles.", uint64(mem.NumGC))
|
||||
writeCounterFloat(&b, "fs_runtime_gc_pause_seconds_total", "Total GC pause time in seconds.", float64(mem.PauseTotalNs)/1e9)
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func normalizeRoute(route string) string {
|
||||
route = strings.TrimSpace(route)
|
||||
if route == "" {
|
||||
return "/unknown"
|
||||
if hasCPU {
|
||||
writeCounterFloat(&b, "process_cpu_seconds_total", "Total user and system CPU time spent in seconds.", cpuSeconds)
|
||||
}
|
||||
return route
|
||||
if hasResident {
|
||||
writeGauge(&b, "process_resident_memory_bytes", "Resident memory size in bytes.", resident)
|
||||
}
|
||||
if hasOpenFDs {
|
||||
writeGauge(&b, "process_open_fds", "Number of open file descriptors.", openFDs)
|
||||
writeGauge(&b, "fs_open_fds", "Number of open file descriptors.", openFDs)
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
type histogramSnapshot struct {
|
||||
@@ -312,16 +617,19 @@ func copyCounterMap(src map[string]uint64) map[string]uint64 {
|
||||
return out
|
||||
}
|
||||
|
||||
func copyIntGaugeMap(src map[string]int64) map[string]int64 {
|
||||
out := make(map[string]int64, len(src))
|
||||
for k, v := range src {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func copyHistogramMap(src map[string]*histogram) map[string]histogramSnapshot {
|
||||
out := make(map[string]histogramSnapshot, len(src))
|
||||
for k, h := range src {
|
||||
bounds, counts, sum, count := h.snapshot()
|
||||
out[k] = histogramSnapshot{
|
||||
bounds: bounds,
|
||||
counts: counts,
|
||||
sum: sum,
|
||||
count: count,
|
||||
}
|
||||
out[k] = histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -357,6 +665,19 @@ func writeGaugeVec(b *strings.Builder, name, help string, values map[string]floa
|
||||
}
|
||||
}
|
||||
|
||||
func writeGaugeVecFromInt64(b *strings.Builder, name, help string, values map[string]int64, labelName string) {
|
||||
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
|
||||
keys := make([]string, 0, len(values))
|
||||
for k := range values {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, key := range keys {
|
||||
fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), float64(values[key]))
|
||||
}
|
||||
}
|
||||
|
||||
func writeCounterVecKV(b *strings.Builder, name, help string, values map[string]uint64, labels []string) {
|
||||
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||
fmt.Fprintf(b, "# TYPE %s counter\n", name)
|
||||
@@ -396,12 +717,7 @@ func writeHistogramVecKV(b *strings.Builder, name, help string, values map[strin
|
||||
func writeHistogram(b *strings.Builder, name, help string, labels map[string]string, bounds []float64, counts []uint64, sum float64, count uint64) {
|
||||
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
|
||||
writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{
|
||||
bounds: bounds,
|
||||
counts: counts,
|
||||
sum: sum,
|
||||
count: count,
|
||||
})
|
||||
writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{bounds: bounds, counts: counts, sum: sum, count: count})
|
||||
}
|
||||
|
||||
func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[string]string, s histogramSnapshot) {
|
||||
@@ -464,8 +780,42 @@ func trimFloat(v float64) string {
|
||||
}
|
||||
|
||||
func escapeLabelValue(value string) string {
|
||||
value = strings.ReplaceAll(value, `\`, `\\`)
|
||||
value = strings.ReplaceAll(value, "\n", `\n`)
|
||||
value = strings.ReplaceAll(value, `"`, `\"`)
|
||||
value = strings.ReplaceAll(value, `\\`, `\\\\`)
|
||||
value = strings.ReplaceAll(value, "\n", `\\n`)
|
||||
value = strings.ReplaceAll(value, `"`, `\\"`)
|
||||
return value
|
||||
}
|
||||
|
||||
func readOpenFDs() (float64, bool) {
|
||||
entries, err := os.ReadDir("/proc/self/fd")
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return float64(len(entries)), true
|
||||
}
|
||||
|
||||
func readResidentMemoryBytes() (float64, bool) {
|
||||
data, err := os.ReadFile("/proc/self/statm")
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
fields := strings.Fields(string(data))
|
||||
if len(fields) < 2 {
|
||||
return 0, false
|
||||
}
|
||||
rssPages, err := strconv.ParseInt(fields[1], 10, 64)
|
||||
if err != nil || rssPages < 0 {
|
||||
return 0, false
|
||||
}
|
||||
return float64(rssPages * int64(os.Getpagesize())), true
|
||||
}
|
||||
|
||||
func readProcessCPUSeconds() (float64, bool) {
|
||||
var usage syscall.Rusage
|
||||
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil {
|
||||
return 0, false
|
||||
}
|
||||
user := float64(usage.Utime.Sec) + float64(usage.Utime.Usec)/1e6
|
||||
sys := float64(usage.Stime.Sec) + float64(usage.Stime.Usec)/1e6
|
||||
return user + sys, true
|
||||
}
|
||||
|
||||
@@ -42,6 +42,28 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ObjectService) acquireGCRLock() func() {
|
||||
waitStart := time.Now()
|
||||
s.gcMu.RLock()
|
||||
metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart))
|
||||
holdStart := time.Now()
|
||||
return func() {
|
||||
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
||||
s.gcMu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ObjectService) acquireGCLock() func() {
|
||||
waitStart := time.Now()
|
||||
s.gcMu.Lock()
|
||||
metrics.Default.ObserveLockWait("gc_mu_write", time.Since(waitStart))
|
||||
holdStart := time.Now()
|
||||
return func() {
|
||||
metrics.Default.ObserveLockHold("gc_mu_write", time.Since(holdStart))
|
||||
s.gcMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
@@ -49,8 +71,8 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
|
||||
metrics.Default.ObserveService("put_object", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
chunks, size, etag, err := s.blob.IngestStream(input)
|
||||
if err != nil {
|
||||
@@ -89,16 +111,21 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
||||
metrics.Default.ObserveService("get_object", time.Since(start), success)
|
||||
}()
|
||||
|
||||
waitStart := time.Now()
|
||||
s.gcMu.RLock()
|
||||
metrics.Default.ObserveLockWait("gc_mu_read", time.Since(waitStart))
|
||||
holdStart := time.Now()
|
||||
|
||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
||||
s.gcMu.RUnlock()
|
||||
return nil, nil, err
|
||||
}
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
defer metrics.Default.ObserveLockHold("gc_mu_read", time.Since(holdStart))
|
||||
defer s.gcMu.RUnlock()
|
||||
if err := s.blob.AssembleStream(manifest.Chunks, pw); err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
@@ -117,8 +144,8 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
|
||||
metrics.Default.ObserveService("head_object", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||
if err != nil {
|
||||
@@ -135,16 +162,16 @@ func (s *ObjectService) DeleteObject(bucket, key string) error {
|
||||
metrics.Default.ObserveService("delete_object", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
err := s.metadata.DeleteManifest(bucket, key)
|
||||
success = err == nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
return s.metadata.ListObjects(bucket, prefix)
|
||||
}
|
||||
@@ -156,8 +183,8 @@ func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*mode
|
||||
metrics.Default.ObserveService("for_each_object_from", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
err := s.metadata.ForEachObjectFrom(bucket, startKey, fn)
|
||||
success = err == nil
|
||||
@@ -171,31 +198,31 @@ func (s *ObjectService) CreateBucket(bucket string) error {
|
||||
metrics.Default.ObserveService("create_bucket", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
err := s.metadata.CreateBucket(bucket)
|
||||
success = err == nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
_, err := s.metadata.GetBucketManifest(bucket)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ObjectService) GetBucketManifest(bucket string) (*models.BucketManifest, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
return s.metadata.GetBucketManifest(bucket)
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteBucket(bucket string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
return s.metadata.DeleteBucket(bucket)
|
||||
}
|
||||
|
||||
@@ -206,8 +233,8 @@ func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||
metrics.Default.ObserveService("list_buckets", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
buckets, err := s.metadata.ListBuckets()
|
||||
success = err == nil
|
||||
@@ -215,14 +242,14 @@ func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||
}
|
||||
|
||||
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
return s.metadata.DeleteManifests(bucket, keys)
|
||||
}
|
||||
|
||||
func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
return s.metadata.CreateMultipartUpload(bucket, key)
|
||||
}
|
||||
|
||||
@@ -233,8 +260,8 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
||||
metrics.Default.ObserveService("upload_part", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
if partNumber < 1 || partNumber > 10000 {
|
||||
return "", ErrInvalidPart
|
||||
@@ -269,8 +296,8 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
||||
}
|
||||
|
||||
func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]models.UploadedPart, error) {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
@@ -289,8 +316,8 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
|
||||
metrics.Default.ObserveService("complete_multipart_upload", time.Since(start), success)
|
||||
}()
|
||||
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
if len(completed) == 0 {
|
||||
return nil, ErrInvalidCompleteRequest
|
||||
@@ -360,8 +387,8 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
|
||||
}
|
||||
|
||||
func (s *ObjectService) AbortMultipartUpload(bucket, key, uploadID string) error {
|
||||
s.gcMu.RLock()
|
||||
defer s.gcMu.RUnlock()
|
||||
unlock := s.acquireGCRLock()
|
||||
defer unlock()
|
||||
|
||||
upload, err := s.metadata.GetMultipartUpload(uploadID)
|
||||
if err != nil {
|
||||
@@ -404,8 +431,8 @@ func (s *ObjectService) GarbageCollect() error {
|
||||
metrics.Default.ObserveGC(time.Since(start), deletedChunks, deleteErrors, cleanedUploads, success)
|
||||
}()
|
||||
|
||||
s.gcMu.Lock()
|
||||
defer s.gcMu.Unlock()
|
||||
unlock := s.acquireGCLock()
|
||||
defer unlock()
|
||||
|
||||
referencedChunkSet, err := s.metadata.GetReferencedChunkSet()
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user