niki/vendor/github.com/redis/go-redis/v9/otel.go

205 lines
8.6 KiB
Go

package redis
import (
"context"
"net"
"time"
"github.com/redis/go-redis/v9/internal/otel"
"github.com/redis/go-redis/v9/internal/pool"
)
// ConnInfo provides information about a Redis connection for metrics.
type ConnInfo interface {
RemoteAddr() net.Addr
PoolName() string
}
type Pooler interface {
PoolStats() *pool.Stats
}
type PubSubPooler interface {
Stats() *pool.PubSubStats
}
// OTelRecorder is the interface for recording OpenTelemetry metrics.
type OTelRecorder interface {
// RecordOperationDuration records the total operation duration (including all retries)
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn ConnInfo, dbIndex int)
// RecordPipelineOperationDuration records the total pipeline/transaction duration.
// operationName should be "PIPELINE" for regular pipelines or "MULTI" for transactions.
RecordPipelineOperationDuration(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn ConnInfo, dbIndex int)
// RecordConnectionCreateTime records the time it took to create a new connection
RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn ConnInfo)
// RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed
// delta: +1 for relaxed, -1 for unrelaxed
// poolName: name of the connection pool (e.g., "main", "pubsub")
// notificationType: the notification type that triggered the timeout relaxation (e.g., "MOVING", "HANDOFF")
RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn ConnInfo, poolName, notificationType string)
// RecordConnectionHandoff records when a connection is handed off to another node
// poolName: name of the connection pool (e.g., "main", "pubsub")
RecordConnectionHandoff(ctx context.Context, cn ConnInfo, poolName string)
// RecordError records client errors (ASK, MOVED, handshake failures, etc.)
// errorType: type of error (e.g., "ASK", "MOVED", "HANDSHAKE_FAILED")
// statusCode: Redis response status code if available (e.g., "MOVED", "ASK")
// isInternal: whether this is an internal error
// retryAttempts: number of retry attempts made
RecordError(ctx context.Context, errorType string, cn ConnInfo, statusCode string, isInternal bool, retryAttempts int)
// RecordMaintenanceNotification records when a maintenance notification is received
// notificationType: the type of notification (e.g., "MOVING", "MIGRATING", etc.)
RecordMaintenanceNotification(ctx context.Context, cn ConnInfo, notificationType string)
// RecordConnectionWaitTime records the time spent waiting for a connection from the pool
RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn ConnInfo)
// RecordConnectionClosed records when a connection is closed
// reason: reason for closing (e.g., "idle", "max_lifetime", "error", "pool_closed")
// err: the error that caused the close (nil for non-error closures)
RecordConnectionClosed(ctx context.Context, cn ConnInfo, reason string, err error)
// RecordPubSubMessage records a Pub/Sub message
// direction: "sent" or "received"
// channel: channel name (may be hidden for cardinality reduction)
// sharded: true for sharded pub/sub (SPUBLISH/SSUBSCRIBE)
RecordPubSubMessage(ctx context.Context, cn ConnInfo, direction, channel string, sharded bool)
// RecordStreamLag records the lag for stream consumer group processing
// lag: time difference between message creation and consumption
// streamName: name of the stream (may be hidden for cardinality reduction)
// consumerGroup: name of the consumer group
// consumerName: name of the consumer
RecordStreamLag(ctx context.Context, lag time.Duration, cn ConnInfo, streamName, consumerGroup, consumerName string)
}
// This is used for async gauge metrics that need to pull stats from pools periodically.
type OTelPoolRegistrar interface {
// RegisterPool is called when a new client is created with its main connection pool.
// poolName: unique identifier for the pool (e.g., "main_abc123")
RegisterPool(poolName string, pool Pooler)
// UnregisterPool is called when a client is closed to remove its pool from the registry.
UnregisterPool(pool Pooler)
// RegisterPubSubPool is called when a new client is created with a PubSub pool.
// poolName: unique identifier for the pool (e.g., "main_abc123_pubsub")
RegisterPubSubPool(poolName string, pool PubSubPooler)
// UnregisterPubSubPool is called when a PubSub client is closed to remove its pool.
UnregisterPubSubPool(pool PubSubPooler)
}
// SetOTelRecorder sets the global OpenTelemetry recorder.
func SetOTelRecorder(r OTelRecorder) {
if r == nil {
otel.SetGlobalRecorder(nil)
return
}
otel.SetGlobalRecorder(&otelRecorderAdapter{r})
}
type otelRecorderAdapter struct {
recorder OTelRecorder
}
// toConnInfo converts *pool.Conn to ConnInfo interface properly.
// This ensures that a nil *pool.Conn becomes a true nil interface,
// not a non-nil interface containing a nil pointer.
func toConnInfo(cn *pool.Conn) ConnInfo {
if cn == nil {
return nil
}
return cn
}
func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd otel.Cmder, attempts int, err error, cn *pool.Conn, dbIndex int) {
// Convert internal Cmder to public Cmder
if publicCmd, ok := cmd.(Cmder); ok {
a.recorder.RecordOperationDuration(ctx, duration, publicCmd, attempts, err, toConnInfo(cn), dbIndex)
}
}
func (a *otelRecorderAdapter) RecordPipelineOperationDuration(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int) {
a.recorder.RecordPipelineOperationDuration(ctx, duration, operationName, cmdCount, attempts, err, toConnInfo(cn), dbIndex)
}
func (a *otelRecorderAdapter) RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn) {
a.recorder.RecordConnectionCreateTime(ctx, duration, toConnInfo(cn))
}
func (a *otelRecorderAdapter) RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) {
a.recorder.RecordConnectionRelaxedTimeout(ctx, delta, toConnInfo(cn), poolName, notificationType)
}
func (a *otelRecorderAdapter) RecordConnectionHandoff(ctx context.Context, cn *pool.Conn, poolName string) {
a.recorder.RecordConnectionHandoff(ctx, toConnInfo(cn), poolName)
}
func (a *otelRecorderAdapter) RecordError(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) {
a.recorder.RecordError(ctx, errorType, toConnInfo(cn), statusCode, isInternal, retryAttempts)
}
func (a *otelRecorderAdapter) RecordMaintenanceNotification(ctx context.Context, cn *pool.Conn, notificationType string) {
a.recorder.RecordMaintenanceNotification(ctx, toConnInfo(cn), notificationType)
}
func (a *otelRecorderAdapter) RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn *pool.Conn) {
a.recorder.RecordConnectionWaitTime(ctx, duration, toConnInfo(cn))
}
func (a *otelRecorderAdapter) RecordConnectionClosed(ctx context.Context, cn *pool.Conn, reason string, err error) {
a.recorder.RecordConnectionClosed(ctx, toConnInfo(cn), reason, err)
}
func (a *otelRecorderAdapter) RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool) {
a.recorder.RecordPubSubMessage(ctx, toConnInfo(cn), direction, channel, sharded)
}
func (a *otelRecorderAdapter) RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string) {
a.recorder.RecordStreamLag(ctx, lag, toConnInfo(cn), streamName, consumerGroup, consumerName)
}
func (a *otelRecorderAdapter) RegisterPool(poolName string, p pool.Pooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.RegisterPool(poolName, &poolerAdapter{p})
}
}
func (a *otelRecorderAdapter) UnregisterPool(p pool.Pooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.UnregisterPool(&poolerAdapter{p})
}
}
func (a *otelRecorderAdapter) RegisterPubSubPool(poolName string, p otel.PubSubPooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.RegisterPubSubPool(poolName, &pubSubPoolerAdapter{p})
}
}
func (a *otelRecorderAdapter) UnregisterPubSubPool(p otel.PubSubPooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.UnregisterPubSubPool(&pubSubPoolerAdapter{p})
}
}
type poolerAdapter struct {
p pool.Pooler
}
func (a *poolerAdapter) PoolStats() *pool.Stats {
return a.p.Stats()
}
type pubSubPoolerAdapter struct {
p otel.PubSubPooler
}
func (a *pubSubPoolerAdapter) Stats() *pool.PubSubStats {
return a.p.Stats()
}