mirror of
https://github.com/ferdzo/fs.git
synced 2026-04-05 14:26:26 +00:00
Initial metrics endpoint added in Prometheus style
This commit is contained in:
@@ -45,6 +45,8 @@ Reference: `auth/README.md`
|
|||||||
Health:
|
Health:
|
||||||
- `GET /healthz`
|
- `GET /healthz`
|
||||||
- `HEAD /healthz`
|
- `HEAD /healthz`
|
||||||
|
- `GET /metrics` (Prometheus exposition format)
|
||||||
|
- `HEAD /metrics`
|
||||||
|
|
||||||
## Limitations
|
## Limitations
|
||||||
|
|
||||||
|
|||||||
14
api/api.go
14
api/api.go
@@ -10,6 +10,7 @@ import (
|
|||||||
"fs/auth"
|
"fs/auth"
|
||||||
"fs/logging"
|
"fs/logging"
|
||||||
"fs/metadata"
|
"fs/metadata"
|
||||||
|
"fs/metrics"
|
||||||
"fs/models"
|
"fs/models"
|
||||||
"fs/service"
|
"fs/service"
|
||||||
"io"
|
"io"
|
||||||
@@ -70,6 +71,8 @@ func (h *Handler) setupRoutes() {
|
|||||||
|
|
||||||
h.router.Get("/healthz", h.handleHealth)
|
h.router.Get("/healthz", h.handleHealth)
|
||||||
h.router.Head("/healthz", h.handleHealth)
|
h.router.Head("/healthz", h.handleHealth)
|
||||||
|
h.router.Get("/metrics", h.handleMetrics)
|
||||||
|
h.router.Head("/metrics", h.handleMetrics)
|
||||||
h.router.Get("/", h.handleGetBuckets)
|
h.router.Get("/", h.handleGetBuckets)
|
||||||
|
|
||||||
h.router.Get("/{bucket}/", h.handleGetBucket)
|
h.router.Get("/{bucket}/", h.handleGetBucket)
|
||||||
@@ -106,6 +109,17 @@ func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) handleMetrics(w http.ResponseWriter, r *http.Request) {
|
||||||
|
payload := metrics.Default.RenderPrometheus()
|
||||||
|
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
||||||
|
w.Header().Set("Content-Length", strconv.Itoa(len(payload)))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
if r.Method == http.MethodHead {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, _ = w.Write([]byte(payload))
|
||||||
|
}
|
||||||
|
|
||||||
func validateObjectKey(key string) *s3APIError {
|
func validateObjectKey(key string) *s3APIError {
|
||||||
if key == "" {
|
if key == "" {
|
||||||
err := s3ErrInvalidObjectKey
|
err := s3ErrInvalidObjectKey
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package auth
|
package auth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"fs/metrics"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -18,17 +20,20 @@ func Middleware(
|
|||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
authCtx := RequestContext{Authenticated: false, AuthType: "none"}
|
authCtx := RequestContext{Authenticated: false, AuthType: "none"}
|
||||||
if svc == nil || !svc.Config().Enabled {
|
if svc == nil || !svc.Config().Enabled {
|
||||||
|
metrics.Default.ObserveAuth("bypass", "disabled", "auth_disabled")
|
||||||
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
|
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.URL.Path == "/healthz" {
|
if r.URL.Path == "/healthz" || r.URL.Path == "/metrics" {
|
||||||
|
metrics.Default.ObserveAuth("bypass", "none", "public_endpoint")
|
||||||
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
|
next.ServeHTTP(w, r.WithContext(WithRequestContext(r.Context(), authCtx)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resolvedCtx, err := svc.AuthenticateRequest(r)
|
resolvedCtx, err := svc.AuthenticateRequest(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.Default.ObserveAuth("error", "sigv4", authErrorClass(err))
|
||||||
if auditEnabled && logger != nil {
|
if auditEnabled && logger != nil {
|
||||||
requestID := middleware.GetReqID(r.Context())
|
requestID := middleware.GetReqID(r.Context())
|
||||||
attrs := []any{
|
attrs := []any{
|
||||||
@@ -50,6 +55,7 @@ func Middleware(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics.Default.ObserveAuth("ok", resolvedCtx.AuthType, "none")
|
||||||
if auditEnabled && logger != nil {
|
if auditEnabled && logger != nil {
|
||||||
requestID := middleware.GetReqID(r.Context())
|
requestID := middleware.GetReqID(r.Context())
|
||||||
attrs := []any{
|
attrs := []any{
|
||||||
@@ -69,6 +75,33 @@ func Middleware(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func authErrorClass(err error) string {
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, ErrInvalidAccessKeyID):
|
||||||
|
return "invalid_access_key"
|
||||||
|
case errors.Is(err, ErrSignatureDoesNotMatch):
|
||||||
|
return "signature_mismatch"
|
||||||
|
case errors.Is(err, ErrAuthorizationHeaderMalformed):
|
||||||
|
return "auth_header_malformed"
|
||||||
|
case errors.Is(err, ErrRequestTimeTooSkewed):
|
||||||
|
return "time_skew"
|
||||||
|
case errors.Is(err, ErrExpiredToken):
|
||||||
|
return "expired_token"
|
||||||
|
case errors.Is(err, ErrNoAuthCredentials):
|
||||||
|
return "missing_credentials"
|
||||||
|
case errors.Is(err, ErrUnsupportedAuthScheme):
|
||||||
|
return "unsupported_auth_scheme"
|
||||||
|
case errors.Is(err, ErrInvalidPresign):
|
||||||
|
return "invalid_presign"
|
||||||
|
case errors.Is(err, ErrCredentialDisabled):
|
||||||
|
return "credential_disabled"
|
||||||
|
case errors.Is(err, ErrAccessDenied):
|
||||||
|
return "access_denied"
|
||||||
|
default:
|
||||||
|
return "other"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func clientIP(remoteAddr string) string {
|
func clientIP(remoteAddr string) string {
|
||||||
host, _, err := net.SplitHostPort(remoteAddr)
|
host, _, err := net.SplitHostPort(remoteAddr)
|
||||||
if err == nil && host != "" {
|
if err == nil && host != "" {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package logging
|
package logging
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fs/metrics"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@@ -9,6 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -86,6 +88,8 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
|
|||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
|
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
|
||||||
|
metrics.Default.IncHTTPInFlight()
|
||||||
|
defer metrics.Default.DecHTTPInFlight()
|
||||||
requestID := middleware.GetReqID(r.Context())
|
requestID := middleware.GetReqID(r.Context())
|
||||||
if requestID != "" {
|
if requestID != "" {
|
||||||
ww.Header().Set("x-amz-request-id", requestID)
|
ww.Header().Set("x-amz-request-id", requestID)
|
||||||
@@ -93,15 +97,18 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
|
|||||||
|
|
||||||
next.ServeHTTP(ww, r)
|
next.ServeHTTP(ww, r)
|
||||||
|
|
||||||
if !cfg.Audit && !cfg.DebugMode {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
status := ww.Status()
|
status := ww.Status()
|
||||||
if status == 0 {
|
if status == 0 {
|
||||||
status = http.StatusOK
|
status = http.StatusOK
|
||||||
}
|
}
|
||||||
|
route := metricRouteLabel(r)
|
||||||
|
metrics.Default.ObserveHTTPRequest(r.Method, route, status, elapsed, ww.BytesWritten())
|
||||||
|
|
||||||
|
if !cfg.Audit && !cfg.DebugMode {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
attrs := []any{
|
attrs := []any{
|
||||||
"method", r.Method,
|
"method", r.Method,
|
||||||
"path", r.URL.Path,
|
"path", r.URL.Path,
|
||||||
@@ -131,6 +138,35 @@ func HTTPMiddleware(logger *slog.Logger, cfg Config) func(http.Handler) http.Han
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func metricRouteLabel(r *http.Request) string {
|
||||||
|
if r == nil || r.URL == nil {
|
||||||
|
return "/unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
if routeCtx := chi.RouteContext(r.Context()); routeCtx != nil {
|
||||||
|
if pattern := strings.TrimSpace(routeCtx.RoutePattern()); pattern != "" {
|
||||||
|
return pattern
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
path := strings.TrimSpace(r.URL.Path)
|
||||||
|
if path == "" || path == "/" {
|
||||||
|
return "/"
|
||||||
|
}
|
||||||
|
if path == "/healthz" || path == "/metrics" {
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
|
||||||
|
trimmed := strings.Trim(path, "/")
|
||||||
|
if trimmed == "" {
|
||||||
|
return "/"
|
||||||
|
}
|
||||||
|
if !strings.Contains(trimmed, "/") {
|
||||||
|
return "/{bucket}"
|
||||||
|
}
|
||||||
|
return "/{bucket}/*"
|
||||||
|
}
|
||||||
|
|
||||||
func envBool(key string, defaultValue bool) bool {
|
func envBool(key string, defaultValue bool) bool {
|
||||||
raw := os.Getenv(key)
|
raw := os.Getenv(key)
|
||||||
if raw == "" {
|
if raw == "" {
|
||||||
|
|||||||
30
logging/logging_metrics_test.go
Normal file
30
logging/logging_metrics_test.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package logging
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMetricRouteLabelFallbacks(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
path string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{name: "root", path: "/", want: "/"},
|
||||||
|
{name: "health", path: "/healthz", want: "/healthz"},
|
||||||
|
{name: "metrics", path: "/metrics", want: "/metrics"},
|
||||||
|
{name: "bucket", path: "/some-bucket", want: "/{bucket}"},
|
||||||
|
{name: "object", path: "/some-bucket/private/path/file.jpg", want: "/{bucket}/*"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("GET", tc.path, nil)
|
||||||
|
got := metricRouteLabel(req)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Fatalf("metricRouteLabel(%q) = %q, want %q", tc.path, got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"fs/metrics"
|
||||||
"fs/models"
|
"fs/models"
|
||||||
"net"
|
"net"
|
||||||
"regexp"
|
"regexp"
|
||||||
@@ -47,7 +48,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
|||||||
}
|
}
|
||||||
h := &MetadataHandler{db: db}
|
h := &MetadataHandler{db: db}
|
||||||
|
|
||||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
err = h.update(func(tx *bbolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists(systemIndex)
|
_, err := tx.CreateBucketIfNotExists(systemIndex)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -55,7 +56,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
|||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
err = h.update(func(tx *bbolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
|
_, err := tx.CreateBucketIfNotExists(multipartUploadIndex)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -63,7 +64,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
|||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
err = h.update(func(tx *bbolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
|
_, err := tx.CreateBucketIfNotExists(multipartUploadPartsIndex)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -71,7 +72,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
|||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
err = h.update(func(tx *bbolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists(authIdentitiesIndex)
|
_, err := tx.CreateBucketIfNotExists(authIdentitiesIndex)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -79,7 +80,7 @@ func NewMetadataHandler(dbPath string) (*MetadataHandler, error) {
|
|||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
err = h.update(func(tx *bbolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists(authPoliciesIndex)
|
_, err := tx.CreateBucketIfNotExists(authPoliciesIndex)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -119,6 +120,20 @@ func (h *MetadataHandler) Close() error {
|
|||||||
return h.db.Close()
|
return h.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) view(fn func(tx *bbolt.Tx) error) error {
|
||||||
|
start := time.Now()
|
||||||
|
err := h.db.View(fn)
|
||||||
|
metrics.Default.ObserveMetadataTx("view", time.Since(start), err == nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MetadataHandler) update(fn func(tx *bbolt.Tx) error) error {
|
||||||
|
start := time.Now()
|
||||||
|
err := h.db.Update(fn)
|
||||||
|
metrics.Default.ObserveMetadataTx("update", time.Since(start), err == nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error {
|
func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error {
|
||||||
if identity == nil {
|
if identity == nil {
|
||||||
return errors.New("auth identity is required")
|
return errors.New("auth identity is required")
|
||||||
@@ -126,7 +141,7 @@ func (h *MetadataHandler) PutAuthIdentity(identity *models.AuthIdentity) error {
|
|||||||
if strings.TrimSpace(identity.AccessKeyID) == "" {
|
if strings.TrimSpace(identity.AccessKeyID) == "" {
|
||||||
return errors.New("access key id is required")
|
return errors.New("access key id is required")
|
||||||
}
|
}
|
||||||
return h.db.Update(func(tx *bbolt.Tx) error {
|
return h.update(func(tx *bbolt.Tx) error {
|
||||||
bucket := tx.Bucket(authIdentitiesIndex)
|
bucket := tx.Bucket(authIdentitiesIndex)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
return errors.New("auth identities index not found")
|
return errors.New("auth identities index not found")
|
||||||
@@ -146,7 +161,7 @@ func (h *MetadataHandler) GetAuthIdentity(accessKeyID string) (*models.AuthIdent
|
|||||||
}
|
}
|
||||||
|
|
||||||
var identity *models.AuthIdentity
|
var identity *models.AuthIdentity
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
bucket := tx.Bucket(authIdentitiesIndex)
|
bucket := tx.Bucket(authIdentitiesIndex)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
return errors.New("auth identities index not found")
|
return errors.New("auth identities index not found")
|
||||||
@@ -177,7 +192,7 @@ func (h *MetadataHandler) PutAuthPolicy(policy *models.AuthPolicy) error {
|
|||||||
return errors.New("auth policy principal is required")
|
return errors.New("auth policy principal is required")
|
||||||
}
|
}
|
||||||
policy.Principal = principal
|
policy.Principal = principal
|
||||||
return h.db.Update(func(tx *bbolt.Tx) error {
|
return h.update(func(tx *bbolt.Tx) error {
|
||||||
bucket := tx.Bucket(authPoliciesIndex)
|
bucket := tx.Bucket(authPoliciesIndex)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
return errors.New("auth policies index not found")
|
return errors.New("auth policies index not found")
|
||||||
@@ -197,7 +212,7 @@ func (h *MetadataHandler) GetAuthPolicy(accessKeyID string) (*models.AuthPolicy,
|
|||||||
}
|
}
|
||||||
|
|
||||||
var policy *models.AuthPolicy
|
var policy *models.AuthPolicy
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
bucket := tx.Bucket(authPoliciesIndex)
|
bucket := tx.Bucket(authPoliciesIndex)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
return errors.New("auth policies index not found")
|
return errors.New("auth policies index not found")
|
||||||
@@ -224,7 +239,7 @@ func (h *MetadataHandler) CreateBucket(bucketName string) error {
|
|||||||
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -256,7 +271,7 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
|
|||||||
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
return fmt.Errorf("%w: %s", ErrInvalidBucketName, bucketName)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
indexBucket, err := tx.CreateBucketIfNotExists([]byte(systemIndex))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -303,7 +318,7 @@ func (h *MetadataHandler) DeleteBucket(bucketName string) error {
|
|||||||
|
|
||||||
func (h *MetadataHandler) ListBuckets() ([]string, error) {
|
func (h *MetadataHandler) ListBuckets() ([]string, error) {
|
||||||
buckets := []string{}
|
buckets := []string{}
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
if systemIndexBucket == nil {
|
if systemIndexBucket == nil {
|
||||||
return errors.New("system index not found")
|
return errors.New("system index not found")
|
||||||
@@ -323,7 +338,7 @@ func (h *MetadataHandler) ListBuckets() ([]string, error) {
|
|||||||
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
|
func (h *MetadataHandler) GetBucketManifest(bucketName string) (*models.BucketManifest, error) {
|
||||||
var manifest *models.BucketManifest
|
var manifest *models.BucketManifest
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
if systemIndexBucket == nil {
|
if systemIndexBucket == nil {
|
||||||
return errors.New("system index not found")
|
return errors.New("system index not found")
|
||||||
@@ -353,7 +368,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
data, err := json.Marshal(manifest)
|
data, err := json.Marshal(manifest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -373,7 +388,7 @@ func (h *MetadataHandler) PutManifest(manifest *models.ObjectManifest) error {
|
|||||||
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
func (h *MetadataHandler) GetManifest(bucket, key string) (*models.ObjectManifest, error) {
|
||||||
var manifest *models.ObjectManifest
|
var manifest *models.ObjectManifest
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
metadataBucket := tx.Bucket([]byte(bucket))
|
metadataBucket := tx.Bucket([]byte(bucket))
|
||||||
if metadataBucket == nil {
|
if metadataBucket == nil {
|
||||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
@@ -400,7 +415,7 @@ func (h *MetadataHandler) ListObjects(bucket, prefix string) ([]*models.ObjectMa
|
|||||||
|
|
||||||
var objects []*models.ObjectManifest
|
var objects []*models.ObjectManifest
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
if systemIndexBucket == nil {
|
if systemIndexBucket == nil {
|
||||||
return errors.New("system index not found")
|
return errors.New("system index not found")
|
||||||
@@ -440,7 +455,7 @@ func (h *MetadataHandler) ForEachObjectFrom(bucket, startKey string, fn func(*mo
|
|||||||
return errors.New("object callback is required")
|
return errors.New("object callback is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
return h.db.View(func(tx *bbolt.Tx) error {
|
return h.view(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
if systemIndexBucket == nil {
|
if systemIndexBucket == nil {
|
||||||
return errors.New("system index not found")
|
return errors.New("system index not found")
|
||||||
@@ -480,7 +495,7 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
metadataBucket := tx.Bucket([]byte(bucket))
|
metadataBucket := tx.Bucket([]byte(bucket))
|
||||||
if metadataBucket == nil {
|
if metadataBucket == nil {
|
||||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
@@ -497,7 +512,7 @@ func (h *MetadataHandler) DeleteManifest(bucket, key string) error {
|
|||||||
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
|
func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]string, error) {
|
||||||
deleted := make([]string, 0, len(keys))
|
deleted := make([]string, 0, len(keys))
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
metadataBucket := tx.Bucket([]byte(bucket))
|
metadataBucket := tx.Bucket([]byte(bucket))
|
||||||
if metadataBucket == nil {
|
if metadataBucket == nil {
|
||||||
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
return fmt.Errorf("%w: %s", ErrBucketNotFound, bucket)
|
||||||
@@ -525,7 +540,7 @@ func (h *MetadataHandler) DeleteManifests(bucket string, keys []string) ([]strin
|
|||||||
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.MultipartUpload, error) {
|
||||||
var upload *models.MultipartUpload
|
var upload *models.MultipartUpload
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
if systemIndexBucket == nil {
|
if systemIndexBucket == nil {
|
||||||
return errors.New("system index not found")
|
return errors.New("system index not found")
|
||||||
@@ -548,7 +563,7 @@ func (h *MetadataHandler) CreateMultipartUpload(bucket, key string) (*models.Mul
|
|||||||
State: "pending",
|
State: "pending",
|
||||||
}
|
}
|
||||||
|
|
||||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
err = h.update(func(tx *bbolt.Tx) error {
|
||||||
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
|
multipartUploadBucket := tx.Bucket([]byte(multipartUploadIndex))
|
||||||
if multipartUploadBucket == nil {
|
if multipartUploadBucket == nil {
|
||||||
return errors.New("multipart upload index not found")
|
return errors.New("multipart upload index not found")
|
||||||
@@ -643,7 +658,7 @@ func deleteMultipartPartsByUploadID(tx *bbolt.Tx, uploadID string) error {
|
|||||||
|
|
||||||
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
|
func (h *MetadataHandler) GetMultipartUpload(uploadID string) (*models.MultipartUpload, error) {
|
||||||
var upload *models.MultipartUpload
|
var upload *models.MultipartUpload
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
var err error
|
var err error
|
||||||
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
|
upload, _, err = getMultipartUploadFromTx(tx, uploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -661,7 +676,7 @@ func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.Uploaded
|
|||||||
return fmt.Errorf("invalid part number: %d", part.PartNumber)
|
return fmt.Errorf("invalid part number: %d", part.PartNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
|
upload, _, err := getMultipartUploadFromTx(tx, uploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -690,7 +705,7 @@ func (h *MetadataHandler) PutMultipartPart(uploadID string, part models.Uploaded
|
|||||||
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
|
func (h *MetadataHandler) ListMultipartParts(uploadID string) ([]models.UploadedPart, error) {
|
||||||
parts := make([]models.UploadedPart, 0)
|
parts := make([]models.UploadedPart, 0)
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
|
if _, _, err := getMultipartUploadFromTx(tx, uploadID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -724,7 +739,7 @@ func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models
|
|||||||
return errors.New("final object manifest is required")
|
return errors.New("final object manifest is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -763,7 +778,7 @@ func (h *MetadataHandler) CompleteMultipartUpload(uploadID string, final *models
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
|
func (h *MetadataHandler) AbortMultipartUpload(uploadID string) error {
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
upload, multipartUploadBucket, err := getMultipartUploadFromTx(tx, uploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -793,7 +808,7 @@ func (h *MetadataHandler) CleanupMultipartUploads(retention time.Duration) (int,
|
|||||||
}
|
}
|
||||||
|
|
||||||
cleaned := 0
|
cleaned := 0
|
||||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
err := h.update(func(tx *bbolt.Tx) error {
|
||||||
uploadsBucket, err := getMultipartUploadBucket(tx)
|
uploadsBucket, err := getMultipartUploadBucket(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -843,7 +858,7 @@ func (h *MetadataHandler) GetReferencedChunkSet() (map[string]struct{}, error) {
|
|||||||
chunkSet := make(map[string]struct{})
|
chunkSet := make(map[string]struct{})
|
||||||
pendingUploadSet := make(map[string]struct{})
|
pendingUploadSet := make(map[string]struct{})
|
||||||
|
|
||||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
err := h.view(func(tx *bbolt.Tx) error {
|
||||||
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
systemIndexBucket := tx.Bucket([]byte(systemIndex))
|
||||||
if systemIndexBucket == nil {
|
if systemIndexBucket == nil {
|
||||||
return errors.New("system index not found")
|
return errors.New("system index not found")
|
||||||
|
|||||||
471
metrics/metrics.go
Normal file
471
metrics/metrics.go
Normal file
@@ -0,0 +1,471 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultBuckets = []float64{
|
||||||
|
0.0005, 0.001, 0.0025, 0.005, 0.01,
|
||||||
|
0.025, 0.05, 0.1, 0.25, 0.5,
|
||||||
|
1, 2.5, 5, 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
var Default = NewRegistry()
|
||||||
|
|
||||||
|
type histogram struct {
|
||||||
|
bounds []float64
|
||||||
|
counts []uint64
|
||||||
|
sum float64
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHistogram(bounds []float64) *histogram {
|
||||||
|
cloned := make([]float64, len(bounds))
|
||||||
|
copy(cloned, bounds)
|
||||||
|
return &histogram{
|
||||||
|
bounds: cloned,
|
||||||
|
counts: make([]uint64, len(bounds)+1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *histogram) observe(v float64) {
|
||||||
|
h.count++
|
||||||
|
h.sum += v
|
||||||
|
for i, bound := range h.bounds {
|
||||||
|
if v <= bound {
|
||||||
|
h.counts[i]++
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.counts[len(h.counts)-1]++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *histogram) snapshot() (bounds []float64, counts []uint64, sum float64, count uint64) {
|
||||||
|
bounds = make([]float64, len(h.bounds))
|
||||||
|
copy(bounds, h.bounds)
|
||||||
|
counts = make([]uint64, len(h.counts))
|
||||||
|
copy(counts, h.counts)
|
||||||
|
return bounds, counts, h.sum, h.count
|
||||||
|
}
|
||||||
|
|
||||||
|
type Registry struct {
|
||||||
|
startedAt time.Time
|
||||||
|
inFlight atomic.Int64
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
|
httpRequests map[string]uint64
|
||||||
|
httpResponseByte map[string]uint64
|
||||||
|
httpDuration map[string]*histogram
|
||||||
|
|
||||||
|
authRequests map[string]uint64
|
||||||
|
|
||||||
|
serviceOps map[string]uint64
|
||||||
|
serviceDuration map[string]*histogram
|
||||||
|
|
||||||
|
dbTxTotal map[string]uint64
|
||||||
|
dbTxDuration map[string]*histogram
|
||||||
|
|
||||||
|
blobOps map[string]uint64
|
||||||
|
blobBytes map[string]uint64
|
||||||
|
blobDuration map[string]*histogram
|
||||||
|
|
||||||
|
gcRuns map[string]uint64
|
||||||
|
gcDuration *histogram
|
||||||
|
gcDeletedChunks uint64
|
||||||
|
gcDeleteErrors uint64
|
||||||
|
gcCleanedUpload uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRegistry() *Registry {
|
||||||
|
return &Registry{
|
||||||
|
startedAt: time.Now(),
|
||||||
|
httpRequests: make(map[string]uint64),
|
||||||
|
httpResponseByte: make(map[string]uint64),
|
||||||
|
httpDuration: make(map[string]*histogram),
|
||||||
|
authRequests: make(map[string]uint64),
|
||||||
|
serviceOps: make(map[string]uint64),
|
||||||
|
serviceDuration: make(map[string]*histogram),
|
||||||
|
dbTxTotal: make(map[string]uint64),
|
||||||
|
dbTxDuration: make(map[string]*histogram),
|
||||||
|
blobOps: make(map[string]uint64),
|
||||||
|
blobBytes: make(map[string]uint64),
|
||||||
|
blobDuration: make(map[string]*histogram),
|
||||||
|
gcRuns: make(map[string]uint64),
|
||||||
|
gcDuration: newHistogram(defaultBuckets),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) IncHTTPInFlight() {
|
||||||
|
r.inFlight.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) DecHTTPInFlight() {
|
||||||
|
r.inFlight.Add(-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) ObserveHTTPRequest(method, route string, status int, d time.Duration, responseBytes int) {
|
||||||
|
route = normalizeRoute(route)
|
||||||
|
key := method + "|" + route + "|" + strconv.Itoa(status)
|
||||||
|
durationKey := method + "|" + route
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
r.httpRequests[key]++
|
||||||
|
if responseBytes > 0 {
|
||||||
|
r.httpResponseByte[key] += uint64(responseBytes)
|
||||||
|
}
|
||||||
|
h := r.httpDuration[durationKey]
|
||||||
|
if h == nil {
|
||||||
|
h = newHistogram(defaultBuckets)
|
||||||
|
r.httpDuration[durationKey] = h
|
||||||
|
}
|
||||||
|
h.observe(d.Seconds())
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) ObserveAuth(result, authType, reason string) {
|
||||||
|
authType = strings.TrimSpace(authType)
|
||||||
|
if authType == "" {
|
||||||
|
authType = "unknown"
|
||||||
|
}
|
||||||
|
reason = strings.TrimSpace(reason)
|
||||||
|
if reason == "" {
|
||||||
|
reason = "none"
|
||||||
|
}
|
||||||
|
key := result + "|" + authType + "|" + reason
|
||||||
|
r.mu.Lock()
|
||||||
|
r.authRequests[key]++
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) ObserveService(operation string, d time.Duration, ok bool) {
|
||||||
|
result := "error"
|
||||||
|
if ok {
|
||||||
|
result = "ok"
|
||||||
|
}
|
||||||
|
key := operation + "|" + result
|
||||||
|
r.mu.Lock()
|
||||||
|
r.serviceOps[key]++
|
||||||
|
h := r.serviceDuration[operation]
|
||||||
|
if h == nil {
|
||||||
|
h = newHistogram(defaultBuckets)
|
||||||
|
r.serviceDuration[operation] = h
|
||||||
|
}
|
||||||
|
h.observe(d.Seconds())
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) ObserveMetadataTx(txType string, d time.Duration, ok bool) {
|
||||||
|
result := "error"
|
||||||
|
if ok {
|
||||||
|
result = "ok"
|
||||||
|
}
|
||||||
|
key := txType + "|" + result
|
||||||
|
r.mu.Lock()
|
||||||
|
r.dbTxTotal[key]++
|
||||||
|
h := r.dbTxDuration[txType]
|
||||||
|
if h == nil {
|
||||||
|
h = newHistogram(defaultBuckets)
|
||||||
|
r.dbTxDuration[txType] = h
|
||||||
|
}
|
||||||
|
h.observe(d.Seconds())
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) ObserveBlob(operation string, d time.Duration, bytes int64, ok bool) {
|
||||||
|
result := "error"
|
||||||
|
if ok {
|
||||||
|
result = "ok"
|
||||||
|
}
|
||||||
|
key := operation + "|" + result
|
||||||
|
r.mu.Lock()
|
||||||
|
r.blobOps[key]++
|
||||||
|
h := r.blobDuration[operation]
|
||||||
|
if h == nil {
|
||||||
|
h = newHistogram(defaultBuckets)
|
||||||
|
r.blobDuration[operation] = h
|
||||||
|
}
|
||||||
|
h.observe(d.Seconds())
|
||||||
|
if bytes > 0 {
|
||||||
|
switch operation {
|
||||||
|
case "read_chunk":
|
||||||
|
r.blobBytes["read"] += uint64(bytes)
|
||||||
|
case "write_chunk":
|
||||||
|
r.blobBytes["write"] += uint64(bytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) ObserveGC(d time.Duration, deletedChunks, deleteErrors, cleanedUploads int, ok bool) {
|
||||||
|
result := "error"
|
||||||
|
if ok {
|
||||||
|
result = "ok"
|
||||||
|
}
|
||||||
|
r.mu.Lock()
|
||||||
|
r.gcRuns[result]++
|
||||||
|
r.gcDuration.observe(d.Seconds())
|
||||||
|
if deletedChunks > 0 {
|
||||||
|
r.gcDeletedChunks += uint64(deletedChunks)
|
||||||
|
}
|
||||||
|
if deleteErrors > 0 {
|
||||||
|
r.gcDeleteErrors += uint64(deleteErrors)
|
||||||
|
}
|
||||||
|
if cleanedUploads > 0 {
|
||||||
|
r.gcCleanedUpload += uint64(cleanedUploads)
|
||||||
|
}
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) RenderPrometheus() string {
|
||||||
|
now := time.Now()
|
||||||
|
var mem runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&mem)
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
httpReq := copyCounterMap(r.httpRequests)
|
||||||
|
httpBytes := copyCounterMap(r.httpResponseByte)
|
||||||
|
httpDur := copyHistogramMap(r.httpDuration)
|
||||||
|
authReq := copyCounterMap(r.authRequests)
|
||||||
|
serviceOps := copyCounterMap(r.serviceOps)
|
||||||
|
serviceDur := copyHistogramMap(r.serviceDuration)
|
||||||
|
dbTx := copyCounterMap(r.dbTxTotal)
|
||||||
|
dbTxDur := copyHistogramMap(r.dbTxDuration)
|
||||||
|
blobOps := copyCounterMap(r.blobOps)
|
||||||
|
blobBytes := copyCounterMap(r.blobBytes)
|
||||||
|
blobDur := copyHistogramMap(r.blobDuration)
|
||||||
|
gcRuns := copyCounterMap(r.gcRuns)
|
||||||
|
gcDurBounds, gcDurCounts, gcDurSum, gcDurCount := r.gcDuration.snapshot()
|
||||||
|
gcDeletedChunks := r.gcDeletedChunks
|
||||||
|
gcDeleteErrors := r.gcDeleteErrors
|
||||||
|
gcCleanedUploads := r.gcCleanedUpload
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
var b strings.Builder
|
||||||
|
|
||||||
|
writeGauge(&b, "fs_http_inflight_requests", "Current in-flight HTTP requests.", float64(r.inFlight.Load()))
|
||||||
|
writeCounterVecKV(&b, "fs_http_requests_total", "Total HTTP requests handled.", httpReq, []string{"method", "route", "status"})
|
||||||
|
writeCounterVecKV(&b, "fs_http_response_bytes_total", "Total HTTP response bytes written.", httpBytes, []string{"method", "route", "status"})
|
||||||
|
writeHistogramVecKV(&b, "fs_http_request_duration_seconds", "HTTP request latency.", httpDur, []string{"method", "route"})
|
||||||
|
|
||||||
|
writeCounterVecKV(&b, "fs_auth_requests_total", "Authentication attempts by result.", authReq, []string{"result", "auth_type", "reason"})
|
||||||
|
|
||||||
|
writeCounterVecKV(&b, "fs_service_operations_total", "Service-level operation calls.", serviceOps, []string{"operation", "result"})
|
||||||
|
writeHistogramVecKV(&b, "fs_service_operation_duration_seconds", "Service-level operation latency.", serviceDur, []string{"operation"})
|
||||||
|
|
||||||
|
writeCounterVecKV(&b, "fs_metadata_tx_total", "Metadata transaction calls.", dbTx, []string{"type", "result"})
|
||||||
|
writeHistogramVecKV(&b, "fs_metadata_tx_duration_seconds", "Metadata transaction latency.", dbTxDur, []string{"type"})
|
||||||
|
|
||||||
|
writeCounterVecKV(&b, "fs_blob_operations_total", "Blob store operations.", blobOps, []string{"operation", "result"})
|
||||||
|
writeCounterVecKV(&b, "fs_blob_bytes_total", "Blob bytes processed.", blobBytes, []string{"direction"})
|
||||||
|
writeHistogramVecKV(&b, "fs_blob_operation_duration_seconds", "Blob operation latency.", blobDur, []string{"operation"})
|
||||||
|
|
||||||
|
writeCounterVecKV(&b, "fs_gc_runs_total", "Garbage collection runs.", gcRuns, []string{"result"})
|
||||||
|
writeHistogram(&b, "fs_gc_duration_seconds", "Garbage collection runtime.", nil, gcDurBounds, gcDurCounts, gcDurSum, gcDurCount)
|
||||||
|
writeCounter(&b, "fs_gc_deleted_chunks_total", "Deleted chunks during GC.", gcDeletedChunks)
|
||||||
|
writeCounter(&b, "fs_gc_delete_errors_total", "Chunk delete errors during GC.", gcDeleteErrors)
|
||||||
|
writeCounter(&b, "fs_gc_cleaned_uploads_total", "Cleaned multipart uploads during GC.", gcCleanedUploads)
|
||||||
|
|
||||||
|
writeGauge(&b, "fs_uptime_seconds", "Process uptime in seconds.", now.Sub(r.startedAt).Seconds())
|
||||||
|
writeGauge(&b, "fs_runtime_goroutines", "Number of goroutines.", float64(runtime.NumGoroutine()))
|
||||||
|
writeGaugeVec(&b, "fs_runtime_memory_bytes", "Runtime memory in bytes.", map[string]float64{
|
||||||
|
"alloc": float64(mem.Alloc),
|
||||||
|
"total": float64(mem.TotalAlloc),
|
||||||
|
"sys": float64(mem.Sys),
|
||||||
|
"heap_alloc": float64(mem.HeapAlloc),
|
||||||
|
"heap_sys": float64(mem.HeapSys),
|
||||||
|
"stack_sys": float64(mem.StackSys),
|
||||||
|
}, "type")
|
||||||
|
writeCounter(&b, "fs_runtime_gc_cycles_total", "Completed GC cycles.", uint64(mem.NumGC))
|
||||||
|
writeCounterFloat(&b, "fs_runtime_gc_pause_seconds_total", "Total GC pause time in seconds.", float64(mem.PauseTotalNs)/1e9)
|
||||||
|
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeRoute(route string) string {
|
||||||
|
route = strings.TrimSpace(route)
|
||||||
|
if route == "" {
|
||||||
|
return "/unknown"
|
||||||
|
}
|
||||||
|
return route
|
||||||
|
}
|
||||||
|
|
||||||
|
type histogramSnapshot struct {
|
||||||
|
bounds []float64
|
||||||
|
counts []uint64
|
||||||
|
sum float64
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyCounterMap(src map[string]uint64) map[string]uint64 {
|
||||||
|
out := make(map[string]uint64, len(src))
|
||||||
|
for k, v := range src {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyHistogramMap(src map[string]*histogram) map[string]histogramSnapshot {
|
||||||
|
out := make(map[string]histogramSnapshot, len(src))
|
||||||
|
for k, h := range src {
|
||||||
|
bounds, counts, sum, count := h.snapshot()
|
||||||
|
out[k] = histogramSnapshot{
|
||||||
|
bounds: bounds,
|
||||||
|
counts: counts,
|
||||||
|
sum: sum,
|
||||||
|
count: count,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeCounter(b *strings.Builder, name, help string, value uint64) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s counter\n", name)
|
||||||
|
fmt.Fprintf(b, "%s %d\n", name, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeCounterFloat(b *strings.Builder, name, help string, value float64) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s counter\n", name)
|
||||||
|
fmt.Fprintf(b, "%s %.9f\n", name, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeGauge(b *strings.Builder, name, help string, value float64) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
|
||||||
|
fmt.Fprintf(b, "%s %.9f\n", name, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeGaugeVec(b *strings.Builder, name, help string, values map[string]float64, labelName string) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
|
||||||
|
keys := make([]string, 0, len(values))
|
||||||
|
for k := range values {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
for _, key := range keys {
|
||||||
|
fmt.Fprintf(b, "%s{%s=\"%s\"} %.9f\n", name, labelName, escapeLabelValue(key), values[key])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeCounterVecKV(b *strings.Builder, name, help string, values map[string]uint64, labels []string) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s counter\n", name)
|
||||||
|
keys := make([]string, 0, len(values))
|
||||||
|
for k := range values {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
for _, key := range keys {
|
||||||
|
parts := strings.Split(key, "|")
|
||||||
|
fmt.Fprintf(b, "%s{%s} %d\n", name, formatLabels(labels, parts), values[key])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeHistogramVecKV(b *strings.Builder, name, help string, values map[string]histogramSnapshot, labels []string) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
|
||||||
|
keys := make([]string, 0, len(values))
|
||||||
|
for k := range values {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
for _, key := range keys {
|
||||||
|
parts := strings.Split(key, "|")
|
||||||
|
labelsMap := make(map[string]string, len(labels))
|
||||||
|
for i, label := range labels {
|
||||||
|
if i < len(parts) {
|
||||||
|
labelsMap[label] = parts[i]
|
||||||
|
} else {
|
||||||
|
labelsMap[label] = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writeHistogramWithLabelsMap(b, name, labelsMap, values[key])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeHistogram(b *strings.Builder, name, help string, labels map[string]string, bounds []float64, counts []uint64, sum float64, count uint64) {
|
||||||
|
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
|
||||||
|
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
|
||||||
|
writeHistogramWithLabelsMap(b, name, labels, histogramSnapshot{
|
||||||
|
bounds: bounds,
|
||||||
|
counts: counts,
|
||||||
|
sum: sum,
|
||||||
|
count: count,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeHistogramWithLabelsMap(b *strings.Builder, name string, labels map[string]string, s histogramSnapshot) {
|
||||||
|
var cumulative uint64
|
||||||
|
for i, bucketCount := range s.counts {
|
||||||
|
cumulative += bucketCount
|
||||||
|
bucketLabels := cloneLabels(labels)
|
||||||
|
if i < len(s.bounds) {
|
||||||
|
bucketLabels["le"] = trimFloat(s.bounds[i])
|
||||||
|
} else {
|
||||||
|
bucketLabels["le"] = "+Inf"
|
||||||
|
}
|
||||||
|
fmt.Fprintf(b, "%s_bucket{%s} %d\n", name, labelsToString(bucketLabels), cumulative)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(b, "%s_sum{%s} %.9f\n", name, labelsToString(labels), s.sum)
|
||||||
|
fmt.Fprintf(b, "%s_count{%s} %d\n", name, labelsToString(labels), s.count)
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatLabels(keys, values []string) string {
|
||||||
|
parts := make([]string, 0, len(keys))
|
||||||
|
for i, key := range keys {
|
||||||
|
value := ""
|
||||||
|
if i < len(values) {
|
||||||
|
value = values[i]
|
||||||
|
}
|
||||||
|
parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(value)))
|
||||||
|
}
|
||||||
|
return strings.Join(parts, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelsToString(labels map[string]string) string {
|
||||||
|
if len(labels) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
keys := make([]string, 0, len(labels))
|
||||||
|
for k := range labels {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
parts := make([]string, 0, len(keys))
|
||||||
|
for _, key := range keys {
|
||||||
|
parts = append(parts, fmt.Sprintf("%s=\"%s\"", key, escapeLabelValue(labels[key])))
|
||||||
|
}
|
||||||
|
return strings.Join(parts, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
func cloneLabels(in map[string]string) map[string]string {
|
||||||
|
if len(in) == 0 {
|
||||||
|
return map[string]string{}
|
||||||
|
}
|
||||||
|
out := make(map[string]string, len(in)+1)
|
||||||
|
for k, v := range in {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func trimFloat(v float64) string {
|
||||||
|
return strconv.FormatFloat(v, 'f', -1, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func escapeLabelValue(value string) string {
|
||||||
|
value = strings.ReplaceAll(value, `\`, `\\`)
|
||||||
|
value = strings.ReplaceAll(value, "\n", `\n`)
|
||||||
|
value = strings.ReplaceAll(value, `"`, `\"`)
|
||||||
|
return value
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"fs/metadata"
|
"fs/metadata"
|
||||||
|
"fs/metrics"
|
||||||
"fs/models"
|
"fs/models"
|
||||||
"fs/storage"
|
"fs/storage"
|
||||||
"io"
|
"io"
|
||||||
@@ -42,6 +43,12 @@ func NewObjectService(metadataHandler *metadata.MetadataHandler, blobHandler *st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Reader) (*models.ObjectManifest, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("put_object", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
@@ -71,10 +78,17 @@ func (s *ObjectService) PutObject(bucket, key, contentType string, input io.Read
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
success = true
|
||||||
return manifest, nil
|
return manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.ObjectManifest, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("get_object", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
|
|
||||||
manifest, err := s.metadata.GetManifest(bucket, key)
|
manifest, err := s.metadata.GetManifest(bucket, key)
|
||||||
@@ -92,10 +106,17 @@ func (s *ObjectService) GetObject(bucket, key string) (io.ReadCloser, *models.Ob
|
|||||||
}
|
}
|
||||||
_ = pw.Close()
|
_ = pw.Close()
|
||||||
}()
|
}()
|
||||||
|
success = true
|
||||||
return pr, manifest, nil
|
return pr, manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("head_object", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
@@ -103,13 +124,22 @@ func (s *ObjectService) HeadObject(bucket, key string) (models.ObjectManifest, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return models.ObjectManifest{}, err
|
return models.ObjectManifest{}, err
|
||||||
}
|
}
|
||||||
|
success = true
|
||||||
return *manifest, nil
|
return *manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) DeleteObject(bucket, key string) error {
|
func (s *ObjectService) DeleteObject(bucket, key string) error {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("delete_object", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
return s.metadata.DeleteManifest(bucket, key)
|
err := s.metadata.DeleteManifest(bucket, key)
|
||||||
|
success = err == nil
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectManifest, error) {
|
||||||
@@ -120,16 +150,32 @@ func (s *ObjectService) ListObjects(bucket, prefix string) ([]*models.ObjectMani
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
|
func (s *ObjectService) ForEachObjectFrom(bucket, startKey string, fn func(*models.ObjectManifest) error) error {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("for_each_object_from", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
return s.metadata.ForEachObjectFrom(bucket, startKey, fn)
|
err := s.metadata.ForEachObjectFrom(bucket, startKey, fn)
|
||||||
|
success = err == nil
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) CreateBucket(bucket string) error {
|
func (s *ObjectService) CreateBucket(bucket string) error {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("create_bucket", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
return s.metadata.CreateBucket(bucket)
|
err := s.metadata.CreateBucket(bucket)
|
||||||
|
success = err == nil
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) HeadBucket(bucket string) error {
|
func (s *ObjectService) HeadBucket(bucket string) error {
|
||||||
@@ -154,10 +200,18 @@ func (s *ObjectService) DeleteBucket(bucket string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) ListBuckets() ([]string, error) {
|
func (s *ObjectService) ListBuckets() ([]string, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("list_buckets", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
return s.metadata.ListBuckets()
|
buckets, err := s.metadata.ListBuckets()
|
||||||
|
success = err == nil
|
||||||
|
return buckets, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
|
func (s *ObjectService) DeleteObjects(bucket string, keys []string) ([]string, error) {
|
||||||
@@ -173,6 +227,12 @@ func (s *ObjectService) CreateMultipartUpload(bucket, key string) (*models.Multi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
|
func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int, input io.Reader) (string, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("upload_part", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
@@ -204,6 +264,7 @@ func (s *ObjectService) UploadPart(bucket, key, uploadId string, partNumber int,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
success = true
|
||||||
return etag, nil
|
return etag, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,6 +283,12 @@ func (s *ObjectService) ListMultipartParts(bucket, key, uploadID string) ([]mode
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
|
func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, completed []models.CompletedPart) (*models.ObjectManifest, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveService("complete_multipart_upload", time.Since(start), success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.RLock()
|
s.gcMu.RLock()
|
||||||
defer s.gcMu.RUnlock()
|
defer s.gcMu.RUnlock()
|
||||||
|
|
||||||
@@ -288,6 +355,7 @@ func (s *ObjectService) CompleteMultipartUpload(bucket, key, uploadID string, co
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
success = true
|
||||||
return manifest, nil
|
return manifest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -327,6 +395,15 @@ func (s *ObjectService) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjectService) GarbageCollect() error {
|
func (s *ObjectService) GarbageCollect() error {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
deletedChunks := 0
|
||||||
|
deleteErrors := 0
|
||||||
|
cleanedUploads := 0
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveGC(time.Since(start), deletedChunks, deleteErrors, cleanedUploads, success)
|
||||||
|
}()
|
||||||
|
|
||||||
s.gcMu.Lock()
|
s.gcMu.Lock()
|
||||||
defer s.gcMu.Unlock()
|
defer s.gcMu.Unlock()
|
||||||
|
|
||||||
@@ -336,9 +413,6 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalChunks := 0
|
totalChunks := 0
|
||||||
deletedChunks := 0
|
|
||||||
deleteErrors := 0
|
|
||||||
cleanedUploads := 0
|
|
||||||
|
|
||||||
if err := s.blob.ForEachChunk(func(chunkID string) error {
|
if err := s.blob.ForEachChunk(func(chunkID string) error {
|
||||||
totalChunks++
|
totalChunks++
|
||||||
@@ -368,6 +442,7 @@ func (s *ObjectService) GarbageCollect() error {
|
|||||||
"delete_errors", deleteErrors,
|
"delete_errors", deleteErrors,
|
||||||
"cleaned_uploads", cleanedUploads,
|
"cleaned_uploads", cleanedUploads,
|
||||||
)
|
)
|
||||||
|
success = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,10 +6,12 @@ import (
|
|||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"fs/metrics"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const blobRoot = "blobs"
|
const blobRoot = "blobs"
|
||||||
@@ -37,11 +39,16 @@ func NewBlobStore(root string, chunkSize int) (*BlobStore, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, error) {
|
||||||
|
start := time.Now()
|
||||||
fullFileHasher := md5.New()
|
fullFileHasher := md5.New()
|
||||||
|
|
||||||
buffer := make([]byte, bs.chunkSize)
|
buffer := make([]byte, bs.chunkSize)
|
||||||
var totalSize int64
|
var totalSize int64
|
||||||
var chunkIDs []string
|
var chunkIDs []string
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveBlob("ingest_stream", time.Since(start), 0, success)
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
bytesRead, err := io.ReadFull(stream, buffer)
|
bytesRead, err := io.ReadFull(stream, buffer)
|
||||||
@@ -74,10 +81,17 @@ func (bs *BlobStore) IngestStream(stream io.Reader) ([]string, int64, string, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
etag := hex.EncodeToString(fullFileHasher.Sum(nil))
|
||||||
|
success = true
|
||||||
return chunkIDs, totalSize, etag, nil
|
return chunkIDs, totalSize, etag, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveBlob("write_chunk", time.Since(start), int64(len(data)), success)
|
||||||
|
}()
|
||||||
|
|
||||||
if !isValidChunkID(chunkID) {
|
if !isValidChunkID(chunkID) {
|
||||||
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
return fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||||
}
|
}
|
||||||
@@ -88,6 +102,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
|||||||
|
|
||||||
fullPath := filepath.Join(dir, chunkID)
|
fullPath := filepath.Join(dir, chunkID)
|
||||||
if _, err := os.Stat(fullPath); err == nil {
|
if _, err := os.Stat(fullPath); err == nil {
|
||||||
|
success = true
|
||||||
return nil
|
return nil
|
||||||
} else if !os.IsNotExist(err) {
|
} else if !os.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
@@ -119,6 +134,7 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
|||||||
|
|
||||||
if err := os.Rename(tmpPath, fullPath); err != nil {
|
if err := os.Rename(tmpPath, fullPath); err != nil {
|
||||||
if _, statErr := os.Stat(fullPath); statErr == nil {
|
if _, statErr := os.Stat(fullPath); statErr == nil {
|
||||||
|
success = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@@ -128,10 +144,17 @@ func (bs *BlobStore) saveBlob(chunkID string, data []byte) error {
|
|||||||
if err := syncDir(dir); err != nil {
|
if err := syncDir(dir); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
success = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveBlob("assemble_stream", time.Since(start), 0, success)
|
||||||
|
}()
|
||||||
|
|
||||||
for _, chunkID := range chunkIDs {
|
for _, chunkID := range chunkIDs {
|
||||||
chunkData, err := bs.GetBlob(chunkID)
|
chunkData, err := bs.GetBlob(chunkID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -141,14 +164,28 @@ func (bs *BlobStore) AssembleStream(chunkIDs []string, w *io.PipeWriter) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
success = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
|
func (bs *BlobStore) GetBlob(chunkID string) ([]byte, error) {
|
||||||
|
start := time.Now()
|
||||||
|
success := false
|
||||||
|
var size int64
|
||||||
|
defer func() {
|
||||||
|
metrics.Default.ObserveBlob("read_chunk", time.Since(start), size, success)
|
||||||
|
}()
|
||||||
|
|
||||||
if !isValidChunkID(chunkID) {
|
if !isValidChunkID(chunkID) {
|
||||||
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
|
return nil, fmt.Errorf("invalid chunk id: %q", chunkID)
|
||||||
}
|
}
|
||||||
return os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
data, err := os.ReadFile(filepath.Join(bs.dataRoot, blobRoot, chunkID[:2], chunkID[2:4], chunkID))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
size = int64(len(data))
|
||||||
|
success = true
|
||||||
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlobStore) DeleteBlob(chunkID string) error {
|
func (bs *BlobStore) DeleteBlob(chunkID string) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user