niki/vendor/github.com/redis/go-redis/v9/internal/otel/metrics.go

280 lines
13 KiB
Go

package otel
import (
"context"
"crypto/rand"
"encoding/hex"
"sync"
"time"
"github.com/redis/go-redis/v9/internal/pool"
)
// generateUniqueID generates a short unique identifier for pool names.
func generateUniqueID() string {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
return ""
}
return hex.EncodeToString(b)
}
// Cmder is a minimal interface for command information needed for metrics.
// This avoids circular dependencies with the main redis package.
type Cmder interface {
Name() string
FullName() string
Args() []interface{}
Err() error
}
// Recorder is the interface for recording metrics.
type Recorder interface {
// RecordOperationDuration records the total operation duration (including all retries)
// dbIndex is the Redis database index (0-15)
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn *pool.Conn, dbIndex int)
// RecordPipelineOperationDuration records the total pipeline/transaction duration.
// operationName should be "PIPELINE" for regular pipelines or "MULTI" for transactions.
// cmdCount is the number of commands in the pipeline.
// err is the error from the pipeline execution (can be nil).
// dbIndex is the Redis database index (0-15)
RecordPipelineOperationDuration(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int)
// RecordConnectionCreateTime records the time it took to create a new connection
RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn)
// RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed
// delta: +1 for relaxed, -1 for unrelaxed
// poolName: name of the connection pool (e.g., "main", "pubsub")
// notificationType: the notification type that triggered the timeout relaxation (e.g., "MOVING")
RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string)
// RecordConnectionHandoff records when a connection is handed off to another node
// poolName: name of the connection pool (e.g., "main", "pubsub")
RecordConnectionHandoff(ctx context.Context, cn *pool.Conn, poolName string)
// RecordError records client errors (ASK, MOVED, handshake failures, etc.)
// errorType: type of error (e.g., "ASK", "MOVED", "HANDSHAKE_FAILED")
// statusCode: Redis response status code if available (e.g., "MOVED", "ASK")
// isInternal: whether this is an internal error
// retryAttempts: number of retry attempts made
RecordError(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int)
// RecordMaintenanceNotification records when a maintenance notification is received
// notificationType: the type of notification (e.g., "MOVING", "MIGRATING", etc.)
RecordMaintenanceNotification(ctx context.Context, cn *pool.Conn, notificationType string)
// RecordConnectionWaitTime records the time spent waiting for a connection from the pool
RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn *pool.Conn)
// RecordConnectionClosed records when a connection is closed
// reason: reason for closing (e.g., "idle", "max_lifetime", "error", "pool_closed")
// err: the error that caused the close (nil for non-error closures)
RecordConnectionClosed(ctx context.Context, cn *pool.Conn, reason string, err error)
// RecordPubSubMessage records a Pub/Sub message
// direction: "sent" or "received"
// channel: channel name (may be hidden for cardinality reduction)
// sharded: true for sharded pub/sub (SPUBLISH/SSUBSCRIBE)
RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool)
// RecordStreamLag records the lag for stream consumer group processing
// lag: time difference between message creation and consumption
// streamName: name of the stream (may be hidden for cardinality reduction)
// consumerGroup: name of the consumer group
// consumerName: name of the consumer
RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string)
}
type PubSubPooler interface {
Stats() *pool.PubSubStats
}
type PoolRegistrar interface {
// RegisterPool is called when a new client is created with its connection pools.
// poolName: identifier for the pool (e.g., "main_abc123")
// pool: the connection pool
RegisterPool(poolName string, pool pool.Pooler)
// UnregisterPool is called when a client is closed to remove its pool from the registry.
// pool: the connection pool to unregister
UnregisterPool(pool pool.Pooler)
// RegisterPubSubPool is called when a new client is created with a PubSub pool.
// poolName: identifier for the pool (e.g., "main_abc123_pubsub")
// pool: the PubSub connection pool
RegisterPubSubPool(poolName string, pool PubSubPooler)
// UnregisterPubSubPool is called when a PubSub client is closed to remove its pool.
// pool: the PubSub connection pool to unregister
UnregisterPubSubPool(pool PubSubPooler)
}
var (
// recorderMu protects globalRecorder and operation duration callbacks
recorderMu sync.RWMutex
// Global recorder instance (initialized by extra/redisotel-native)
globalRecorder Recorder = noopRecorder{}
// Callbacks for operation duration metrics
operationDurationCallback func(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn *pool.Conn, dbIndex int)
pipelineOperationDurationCallback func(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int)
)
// GetOperationDurationCallback returns the callback for operation duration.
func GetOperationDurationCallback() func(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn *pool.Conn, dbIndex int) {
recorderMu.RLock()
cb := operationDurationCallback
recorderMu.RUnlock()
return cb
}
// GetPipelineOperationDurationCallback returns the callback for pipeline operation duration.
func GetPipelineOperationDurationCallback() func(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int) {
recorderMu.RLock()
cb := pipelineOperationDurationCallback
recorderMu.RUnlock()
return cb
}
// getRecorder returns the current global recorder under a read lock.
func getRecorder() Recorder {
recorderMu.RLock()
r := globalRecorder
recorderMu.RUnlock()
return r
}
// SetGlobalRecorder sets the global recorder (called by Init() in extra/redisotel-native)
func SetGlobalRecorder(r Recorder) {
recorderMu.Lock()
if r == nil {
globalRecorder = noopRecorder{}
operationDurationCallback = nil
pipelineOperationDurationCallback = nil
recorderMu.Unlock()
// Unregister all pool metric callbacks atomically
pool.SetAllMetricCallbacks(nil)
return
}
globalRecorder = r
// Register operation duration callbacks
// These capture r directly since we want them to use the specific recorder
// that was set at this point in time
operationDurationCallback = func(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn *pool.Conn, dbIndex int) {
getRecorder().RecordOperationDuration(ctx, duration, cmd, attempts, err, cn, dbIndex)
}
pipelineOperationDurationCallback = func(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int) {
getRecorder().RecordPipelineOperationDuration(ctx, duration, operationName, cmdCount, attempts, err, cn, dbIndex)
}
recorderMu.Unlock()
// Register all pool metric callbacks atomically
// These use getRecorder() to safely access the current recorder
pool.SetAllMetricCallbacks(&pool.MetricCallbacks{
ConnectionCreateTime: func(ctx context.Context, duration time.Duration, cn *pool.Conn) {
getRecorder().RecordConnectionCreateTime(ctx, duration, cn)
},
ConnectionRelaxedTimeout: func(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) {
getRecorder().RecordConnectionRelaxedTimeout(ctx, delta, cn, poolName, notificationType)
},
ConnectionHandoff: func(ctx context.Context, cn *pool.Conn, poolName string) {
getRecorder().RecordConnectionHandoff(ctx, cn, poolName)
},
Error: func(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) {
getRecorder().RecordError(ctx, errorType, cn, statusCode, isInternal, retryAttempts)
},
MaintenanceNotification: func(ctx context.Context, cn *pool.Conn, notificationType string) {
getRecorder().RecordMaintenanceNotification(ctx, cn, notificationType)
},
ConnectionWaitTime: func(ctx context.Context, duration time.Duration, cn *pool.Conn) {
getRecorder().RecordConnectionWaitTime(ctx, duration, cn)
},
ConnectionClosed: func(ctx context.Context, cn *pool.Conn, reason string, err error) {
getRecorder().RecordConnectionClosed(ctx, cn, reason, err)
},
})
}
// RecordOperationDuration records the total operation duration.
// dbIndex is the Redis database index (0-15).
func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn *pool.Conn, dbIndex int) {
getRecorder().RecordOperationDuration(ctx, duration, cmd, attempts, err, cn, dbIndex)
}
// RecordPipelineOperationDuration records the total pipeline/transaction duration.
// This is called from redis.go after pipeline/transaction execution completes.
// operationName should be "PIPELINE" for regular pipelines or "MULTI" for transactions.
// err is the error from the pipeline execution (can be nil).
// dbIndex is the Redis database index (0-15).
func RecordPipelineOperationDuration(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int) {
getRecorder().RecordPipelineOperationDuration(ctx, duration, operationName, cmdCount, attempts, err, cn, dbIndex)
}
// RecordConnectionCreateTime records the time it took to create a new connection.
func RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn) {
getRecorder().RecordConnectionCreateTime(ctx, duration, cn)
}
// RecordPubSubMessage records a Pub/Sub message sent or received.
func RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool) {
getRecorder().RecordPubSubMessage(ctx, cn, direction, channel, sharded)
}
// RecordStreamLag records the lag between message creation and consumption in a stream.
func RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string) {
getRecorder().RecordStreamLag(ctx, lag, cn, streamName, consumerGroup, consumerName)
}
type noopRecorder struct{}
func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, error, *pool.Conn, int) {
}
func (noopRecorder) RecordPipelineOperationDuration(context.Context, time.Duration, string, int, int, error, *pool.Conn, int) {
}
func (noopRecorder) RecordConnectionCreateTime(context.Context, time.Duration, *pool.Conn) {}
func (noopRecorder) RecordConnectionRelaxedTimeout(context.Context, int, *pool.Conn, string, string) {
}
func (noopRecorder) RecordConnectionHandoff(context.Context, *pool.Conn, string) {}
func (noopRecorder) RecordError(context.Context, string, *pool.Conn, string, bool, int) {}
func (noopRecorder) RecordMaintenanceNotification(context.Context, *pool.Conn, string) {}
func (noopRecorder) RecordConnectionWaitTime(context.Context, time.Duration, *pool.Conn) {}
func (noopRecorder) RecordConnectionClosed(context.Context, *pool.Conn, string, error) {}
func (noopRecorder) RecordPubSubMessage(context.Context, *pool.Conn, string, string, bool) {}
func (noopRecorder) RecordStreamLag(context.Context, time.Duration, *pool.Conn, string, string, string) {
}
// RegisterPools registers connection pools with the global recorder.
func RegisterPools(connPool pool.Pooler, pubSubPool PubSubPooler, addr string) {
// Check if the global recorder implements PoolRegistrar
if registrar, ok := globalRecorder.(PoolRegistrar); ok {
// Generate a unique ID for this client's pools
uniqueID := generateUniqueID()
if connPool != nil {
poolName := addr + "_" + uniqueID
registrar.RegisterPool(poolName, connPool)
}
if pubSubPool != nil {
poolName := addr + "_" + uniqueID + "_pubsub"
registrar.RegisterPubSubPool(poolName, pubSubPool)
}
}
}
// UnregisterPools removes connection pools from the global recorder
func UnregisterPools(connPool pool.Pooler, pubSubPool PubSubPooler) {
// Check if the global recorder implements PoolRegistrar
if registrar, ok := globalRecorder.(PoolRegistrar); ok {
if connPool != nil {
registrar.UnregisterPool(connPool)
}
if pubSubPool != nil {
registrar.UnregisterPubSubPool(pubSubPool)
}
}
}