forked from ebhomengo/niki
1
0
Fork 0
niki/vendor/github.com/redis/go-redis/v9/redis.go

1415 lines
21 KiB
Go
Raw Normal View History

2024-02-18 10:42:21 +00:00
package redis
import (
"context"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
// Scanner internal/hscan.Scanner exposed interface.
2024-02-18 10:42:21 +00:00
type Scanner = hscan.Scanner
// Nil reply returned by Redis when key does not exist.
2024-02-18 10:42:21 +00:00
const Nil = proto.Nil
// SetLogger set custom log
2024-02-18 10:42:21 +00:00
func SetLogger(logger internal.Logging) {
2024-02-18 10:42:21 +00:00
internal.Logger = logger
2024-02-18 10:42:21 +00:00
}
//------------------------------------------------------------------------------
type Hook interface {
DialHook(next DialHook) DialHook
2024-02-18 10:42:21 +00:00
ProcessHook(next ProcessHook) ProcessHook
2024-02-18 10:42:21 +00:00
ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
}
type (
DialHook func(ctx context.Context, network, addr string) (net.Conn, error)
ProcessHook func(ctx context.Context, cmd Cmder) error
2024-02-18 10:42:21 +00:00
ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
)
type hooksMixin struct {
hooksMu *sync.Mutex
slice []Hook
2024-02-18 10:42:21 +00:00
initial hooks
2024-02-18 10:42:21 +00:00
current hooks
}
func (hs *hooksMixin) initHooks(hooks hooks) {
2024-02-18 10:42:21 +00:00
hs.hooksMu = new(sync.Mutex)
2024-02-18 10:42:21 +00:00
hs.initial = hooks
2024-02-18 10:42:21 +00:00
hs.chain()
2024-02-18 10:42:21 +00:00
}
type hooks struct {
dial DialHook
process ProcessHook
pipeline ProcessPipelineHook
2024-02-18 10:42:21 +00:00
txPipeline ProcessPipelineHook
}
func (h *hooks) setDefaults() {
2024-02-18 10:42:21 +00:00
if h.dial == nil {
2024-02-18 10:42:21 +00:00
h.dial = func(ctx context.Context, network, addr string) (net.Conn, error) { return nil, nil }
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if h.process == nil {
2024-02-18 10:42:21 +00:00
h.process = func(ctx context.Context, cmd Cmder) error { return nil }
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if h.pipeline == nil {
2024-02-18 10:42:21 +00:00
h.pipeline = func(ctx context.Context, cmds []Cmder) error { return nil }
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if h.txPipeline == nil {
2024-02-18 10:42:21 +00:00
h.txPipeline = func(ctx context.Context, cmds []Cmder) error { return nil }
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
// AddHook is to add a hook to the queue.
2024-02-18 10:42:21 +00:00
// Hook is a function executed during network connection, command execution, and pipeline,
2024-02-18 10:42:21 +00:00
// it is a first-in-first-out stack queue (FIFO).
2024-02-18 10:42:21 +00:00
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
2024-02-18 10:42:21 +00:00
// For example, you added hook-1, hook-2:
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// client.AddHook(hook-1, hook-2)
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// hook-1:
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
2024-02-18 10:42:21 +00:00
// return func(ctx context.Context, cmd Cmder) error {
2024-02-18 10:42:21 +00:00
// print("hook-1 start")
2024-02-18 10:42:21 +00:00
// next(ctx, cmd)
2024-02-18 10:42:21 +00:00
// print("hook-1 end")
2024-02-18 10:42:21 +00:00
// return nil
2024-02-18 10:42:21 +00:00
// }
2024-02-18 10:42:21 +00:00
// }
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// hook-2:
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
2024-02-18 10:42:21 +00:00
// return func(ctx context.Context, cmd redis.Cmder) error {
2024-02-18 10:42:21 +00:00
// print("hook-2 start")
2024-02-18 10:42:21 +00:00
// next(ctx, cmd)
2024-02-18 10:42:21 +00:00
// print("hook-2 end")
2024-02-18 10:42:21 +00:00
// return nil
2024-02-18 10:42:21 +00:00
// }
2024-02-18 10:42:21 +00:00
// }
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// The execution sequence is:
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// hook-1 start -> hook-2 start -> exec redis cmd -> hook-2 end -> hook-1 end
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
2024-02-18 10:42:21 +00:00
// if "next(ctx, cmd)" is not executed, the redis command will not be executed.
2024-02-18 10:42:21 +00:00
func (hs *hooksMixin) AddHook(hook Hook) {
2024-02-18 10:42:21 +00:00
hs.slice = append(hs.slice, hook)
2024-02-18 10:42:21 +00:00
hs.chain()
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) chain() {
2024-02-18 10:42:21 +00:00
hs.initial.setDefaults()
hs.hooksMu.Lock()
2024-02-18 10:42:21 +00:00
defer hs.hooksMu.Unlock()
hs.current.dial = hs.initial.dial
2024-02-18 10:42:21 +00:00
hs.current.process = hs.initial.process
2024-02-18 10:42:21 +00:00
hs.current.pipeline = hs.initial.pipeline
2024-02-18 10:42:21 +00:00
hs.current.txPipeline = hs.initial.txPipeline
for i := len(hs.slice) - 1; i >= 0; i-- {
2024-02-18 10:42:21 +00:00
if wrapped := hs.slice[i].DialHook(hs.current.dial); wrapped != nil {
2024-02-18 10:42:21 +00:00
hs.current.dial = wrapped
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if wrapped := hs.slice[i].ProcessHook(hs.current.process); wrapped != nil {
2024-02-18 10:42:21 +00:00
hs.current.process = wrapped
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if wrapped := hs.slice[i].ProcessPipelineHook(hs.current.pipeline); wrapped != nil {
2024-02-18 10:42:21 +00:00
hs.current.pipeline = wrapped
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if wrapped := hs.slice[i].ProcessPipelineHook(hs.current.txPipeline); wrapped != nil {
2024-02-18 10:42:21 +00:00
hs.current.txPipeline = wrapped
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) clone() hooksMixin {
2024-02-18 10:42:21 +00:00
hs.hooksMu.Lock()
2024-02-18 10:42:21 +00:00
defer hs.hooksMu.Unlock()
clone := *hs
2024-02-18 10:42:21 +00:00
l := len(clone.slice)
2024-02-18 10:42:21 +00:00
clone.slice = clone.slice[:l:l]
2024-02-18 10:42:21 +00:00
clone.hooksMu = new(sync.Mutex)
2024-02-18 10:42:21 +00:00
return clone
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) withProcessHook(ctx context.Context, cmd Cmder, hook ProcessHook) error {
2024-02-18 10:42:21 +00:00
for i := len(hs.slice) - 1; i >= 0; i-- {
2024-02-18 10:42:21 +00:00
if wrapped := hs.slice[i].ProcessHook(hook); wrapped != nil {
2024-02-18 10:42:21 +00:00
hook = wrapped
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return hook(ctx, cmd)
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) withProcessPipelineHook(
2024-02-18 10:42:21 +00:00
ctx context.Context, cmds []Cmder, hook ProcessPipelineHook,
2024-02-18 10:42:21 +00:00
) error {
2024-02-18 10:42:21 +00:00
for i := len(hs.slice) - 1; i >= 0; i-- {
2024-02-18 10:42:21 +00:00
if wrapped := hs.slice[i].ProcessPipelineHook(hook); wrapped != nil {
2024-02-18 10:42:21 +00:00
hook = wrapped
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return hook(ctx, cmds)
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
2024-02-18 10:42:21 +00:00
hs.hooksMu.Lock()
2024-02-18 10:42:21 +00:00
defer hs.hooksMu.Unlock()
2024-02-18 10:42:21 +00:00
return hs.current.dial(ctx, network, addr)
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) processHook(ctx context.Context, cmd Cmder) error {
2024-02-18 10:42:21 +00:00
return hs.current.process(ctx, cmd)
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) processPipelineHook(ctx context.Context, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
return hs.current.pipeline(ctx, cmds)
2024-02-18 10:42:21 +00:00
}
func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
return hs.current.txPipeline(ctx, cmds)
2024-02-18 10:42:21 +00:00
}
//------------------------------------------------------------------------------
type baseClient struct {
opt *Options
2024-02-18 10:42:21 +00:00
connPool pool.Pooler
onClose func() error // hook called when client is closed
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) clone() *baseClient {
2024-02-18 10:42:21 +00:00
clone := *c
2024-02-18 10:42:21 +00:00
return &clone
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
2024-02-18 10:42:21 +00:00
opt := c.opt.clone()
2024-02-18 10:42:21 +00:00
opt.ReadTimeout = timeout
2024-02-18 10:42:21 +00:00
opt.WriteTimeout = timeout
clone := c.clone()
2024-02-18 10:42:21 +00:00
clone.opt = opt
return clone
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) String() string {
2024-02-18 10:42:21 +00:00
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
2024-02-18 10:42:21 +00:00
cn, err := c.connPool.NewConn(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
err = c.initConn(ctx, cn)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
_ = c.connPool.CloseConn(cn)
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
2024-02-18 10:42:21 +00:00
if c.opt.Limiter != nil {
2024-02-18 10:42:21 +00:00
err := c.opt.Limiter.Allow()
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
cn, err := c._getConn(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
if c.opt.Limiter != nil {
2024-02-18 10:42:21 +00:00
c.opt.Limiter.ReportResult(err)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
2024-02-18 10:42:21 +00:00
cn, err := c.connPool.Get(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
if cn.Inited {
2024-02-18 10:42:21 +00:00
return cn, nil
2024-02-18 10:42:21 +00:00
}
if err := c.initConn(ctx, cn); err != nil {
2024-02-18 10:42:21 +00:00
c.connPool.Remove(ctx, cn, err)
2024-02-18 10:42:21 +00:00
if err := errors.Unwrap(err); err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
2024-02-18 10:42:21 +00:00
if cn.Inited {
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
cn.Inited = true
username, password := c.opt.Username, c.opt.Password
2024-02-18 10:42:21 +00:00
if c.opt.CredentialsProvider != nil {
2024-02-18 10:42:21 +00:00
username, password = c.opt.CredentialsProvider()
2024-02-18 10:42:21 +00:00
}
connPool := pool.NewSingleConnPool(c.connPool, cn)
2024-02-18 10:42:21 +00:00
conn := newConn(c.opt, connPool)
var auth bool
2024-02-18 10:42:21 +00:00
protocol := c.opt.Protocol
2024-02-18 10:42:21 +00:00
// By default, use RESP3 in current version.
2024-02-18 10:42:21 +00:00
if protocol < 2 {
2024-02-18 10:42:21 +00:00
protocol = 3
2024-02-18 10:42:21 +00:00
}
// for redis-server versions that do not support the HELLO command,
2024-02-18 10:42:21 +00:00
// RESP2 will continue to be used.
2024-02-18 10:42:21 +00:00
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
2024-02-18 10:42:21 +00:00
auth = true
2024-02-18 10:42:21 +00:00
} else if !isRedisError(err) {
2024-02-18 10:42:21 +00:00
// When the server responds with the RESP protocol and the result is not a normal
2024-02-18 10:42:21 +00:00
// execution result of the HELLO command, we consider it to be an indication that
2024-02-18 10:42:21 +00:00
// the server does not support the HELLO command.
2024-02-18 10:42:21 +00:00
// The server may be a redis-server that does not support the HELLO command,
2024-02-18 10:42:21 +00:00
// or it could be DragonflyDB or a third-party redis-proxy. They all respond
2024-02-18 10:42:21 +00:00
// with different error string results for unsupported commands, making it
2024-02-18 10:42:21 +00:00
// difficult to rely on error strings to determine all results.
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if !c.opt.DisableIndentity {
2024-02-18 10:42:21 +00:00
libName := ""
2024-02-18 10:42:21 +00:00
libVer := Version()
2024-02-18 10:42:21 +00:00
if c.opt.IdentitySuffix != "" {
2024-02-18 10:42:21 +00:00
libName = c.opt.IdentitySuffix
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
libInfo := LibraryInfo{LibName: &libName}
2024-02-18 10:42:21 +00:00
conn.ClientSetInfo(ctx, libInfo)
2024-02-18 10:42:21 +00:00
libInfo = LibraryInfo{LibVer: &libVer}
2024-02-18 10:42:21 +00:00
conn.ClientSetInfo(ctx, libInfo)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
2024-02-18 10:42:21 +00:00
if !auth && password != "" {
2024-02-18 10:42:21 +00:00
if username != "" {
2024-02-18 10:42:21 +00:00
pipe.AuthACL(ctx, username, password)
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
pipe.Auth(ctx, password)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
if c.opt.DB > 0 {
2024-02-18 10:42:21 +00:00
pipe.Select(ctx, c.opt.DB)
2024-02-18 10:42:21 +00:00
}
if c.opt.readOnly {
2024-02-18 10:42:21 +00:00
pipe.ReadOnly(ctx)
2024-02-18 10:42:21 +00:00
}
if c.opt.ClientName != "" {
2024-02-18 10:42:21 +00:00
pipe.ClientSetName(ctx, c.opt.ClientName)
2024-02-18 10:42:21 +00:00
}
return nil
2024-02-18 10:42:21 +00:00
})
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
if c.opt.OnConnect != nil {
2024-02-18 10:42:21 +00:00
return c.opt.OnConnect(ctx, conn)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
2024-02-18 10:42:21 +00:00
if c.opt.Limiter != nil {
2024-02-18 10:42:21 +00:00
c.opt.Limiter.ReportResult(err)
2024-02-18 10:42:21 +00:00
}
if isBadConn(err, false, c.opt.Addr) {
2024-02-18 10:42:21 +00:00
c.connPool.Remove(ctx, cn, err)
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
c.connPool.Put(ctx, cn)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) withConn(
2024-02-18 10:42:21 +00:00
ctx context.Context, fn func(context.Context, *pool.Conn) error,
2024-02-18 10:42:21 +00:00
) error {
2024-02-18 10:42:21 +00:00
cn, err := c.getConn(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
var fnErr error
2024-02-18 10:42:21 +00:00
defer func() {
2024-02-18 10:42:21 +00:00
c.releaseConn(ctx, cn, fnErr)
2024-02-18 10:42:21 +00:00
}()
fnErr = fn(ctx, cn)
return fnErr
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, error) {
2024-02-18 10:42:21 +00:00
return c.opt.Dialer(ctx, network, addr)
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
2024-02-18 10:42:21 +00:00
var lastErr error
2024-02-18 10:42:21 +00:00
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
2024-02-18 10:42:21 +00:00
attempt := attempt
retry, err := c._process(ctx, cmd, attempt)
2024-02-18 10:42:21 +00:00
if err == nil || !retry {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
lastErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return lastErr
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
2024-02-18 10:42:21 +00:00
if attempt > 0 {
2024-02-18 10:42:21 +00:00
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
2024-02-18 10:42:21 +00:00
return false, err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
retryTimeout := uint32(0)
2024-02-18 10:42:21 +00:00
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
2024-02-18 10:42:21 +00:00
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
2024-02-18 10:42:21 +00:00
return writeCmd(wr, cmd)
2024-02-18 10:42:21 +00:00
}); err != nil {
2024-02-18 10:42:21 +00:00
atomic.StoreUint32(&retryTimeout, 1)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
2024-02-18 10:42:21 +00:00
if cmd.readTimeout() == nil {
2024-02-18 10:42:21 +00:00
atomic.StoreUint32(&retryTimeout, 1)
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
atomic.StoreUint32(&retryTimeout, 0)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
return nil
2024-02-18 10:42:21 +00:00
}); err != nil {
2024-02-18 10:42:21 +00:00
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
2024-02-18 10:42:21 +00:00
return retry, err
2024-02-18 10:42:21 +00:00
}
return false, nil
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) retryBackoff(attempt int) time.Duration {
2024-02-18 10:42:21 +00:00
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
2024-02-18 10:42:21 +00:00
if timeout := cmd.readTimeout(); timeout != nil {
2024-02-18 10:42:21 +00:00
t := *timeout
2024-02-18 10:42:21 +00:00
if t == 0 {
2024-02-18 10:42:21 +00:00
return 0
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return t + 10*time.Second
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return c.opt.ReadTimeout
2024-02-18 10:42:21 +00:00
}
// Close closes the client, releasing any open resources.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// It is rare to Close a Client, as the Client is meant to be
2024-02-18 10:42:21 +00:00
// long-lived and shared between many goroutines.
2024-02-18 10:42:21 +00:00
func (c *baseClient) Close() error {
2024-02-18 10:42:21 +00:00
var firstErr error
2024-02-18 10:42:21 +00:00
if c.onClose != nil {
2024-02-18 10:42:21 +00:00
if err := c.onClose(); err != nil {
2024-02-18 10:42:21 +00:00
firstErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if err := c.connPool.Close(); err != nil && firstErr == nil {
2024-02-18 10:42:21 +00:00
firstErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return firstErr
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) getAddr() string {
2024-02-18 10:42:21 +00:00
return c.opt.Addr
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
if err := c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds); err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return cmdsFirstErr(cmds)
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
if err := c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds); err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return cmdsFirstErr(cmds)
2024-02-18 10:42:21 +00:00
}
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
func (c *baseClient) generalProcessPipeline(
2024-02-18 10:42:21 +00:00
ctx context.Context, cmds []Cmder, p pipelineProcessor,
2024-02-18 10:42:21 +00:00
) error {
2024-02-18 10:42:21 +00:00
var lastErr error
2024-02-18 10:42:21 +00:00
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
2024-02-18 10:42:21 +00:00
if attempt > 0 {
2024-02-18 10:42:21 +00:00
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
2024-02-18 10:42:21 +00:00
setCmdsErr(cmds, err)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
// Enable retries by default to retry dial errors returned by withConn.
2024-02-18 10:42:21 +00:00
canRetry := true
2024-02-18 10:42:21 +00:00
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
2024-02-18 10:42:21 +00:00
var err error
2024-02-18 10:42:21 +00:00
canRetry, err = p(ctx, cn, cmds)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
})
2024-02-18 10:42:21 +00:00
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
2024-02-18 10:42:21 +00:00
return lastErr
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return lastErr
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) pipelineProcessCmds(
2024-02-18 10:42:21 +00:00
ctx context.Context, cn *pool.Conn, cmds []Cmder,
2024-02-18 10:42:21 +00:00
) (bool, error) {
2024-02-18 10:42:21 +00:00
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
2024-02-18 10:42:21 +00:00
return writeCmds(wr, cmds)
2024-02-18 10:42:21 +00:00
}); err != nil {
2024-02-18 10:42:21 +00:00
setCmdsErr(cmds, err)
2024-02-18 10:42:21 +00:00
return true, err
2024-02-18 10:42:21 +00:00
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
2024-02-18 10:42:21 +00:00
return pipelineReadCmds(rd, cmds)
2024-02-18 10:42:21 +00:00
}); err != nil {
2024-02-18 10:42:21 +00:00
return true, err
2024-02-18 10:42:21 +00:00
}
return false, nil
2024-02-18 10:42:21 +00:00
}
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
for i, cmd := range cmds {
2024-02-18 10:42:21 +00:00
err := cmd.readReply(rd)
2024-02-18 10:42:21 +00:00
cmd.SetErr(err)
2024-02-18 10:42:21 +00:00
if err != nil && !isRedisError(err) {
2024-02-18 10:42:21 +00:00
setCmdsErr(cmds[i+1:], err)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
// Retry errors like "LOADING redis is loading the dataset in memory".
2024-02-18 10:42:21 +00:00
return cmds[0].Err()
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) txPipelineProcessCmds(
2024-02-18 10:42:21 +00:00
ctx context.Context, cn *pool.Conn, cmds []Cmder,
2024-02-18 10:42:21 +00:00
) (bool, error) {
2024-02-18 10:42:21 +00:00
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
2024-02-18 10:42:21 +00:00
return writeCmds(wr, cmds)
2024-02-18 10:42:21 +00:00
}); err != nil {
2024-02-18 10:42:21 +00:00
setCmdsErr(cmds, err)
2024-02-18 10:42:21 +00:00
return true, err
2024-02-18 10:42:21 +00:00
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
2024-02-18 10:42:21 +00:00
statusCmd := cmds[0].(*StatusCmd)
2024-02-18 10:42:21 +00:00
// Trim multi and exec.
2024-02-18 10:42:21 +00:00
trimmedCmds := cmds[1 : len(cmds)-1]
if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil {
2024-02-18 10:42:21 +00:00
setCmdsErr(cmds, err)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
return pipelineReadCmds(rd, trimmedCmds)
2024-02-18 10:42:21 +00:00
}); err != nil {
2024-02-18 10:42:21 +00:00
return false, err
2024-02-18 10:42:21 +00:00
}
return false, nil
2024-02-18 10:42:21 +00:00
}
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
// Parse +OK.
2024-02-18 10:42:21 +00:00
if err := statusCmd.readReply(rd); err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// Parse +QUEUED.
2024-02-18 10:42:21 +00:00
for range cmds {
2024-02-18 10:42:21 +00:00
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
// Parse number of replies.
2024-02-18 10:42:21 +00:00
line, err := rd.ReadLine()
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
if err == Nil {
2024-02-18 10:42:21 +00:00
err = TxFailedErr
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
if line[0] != proto.RespArray {
2024-02-18 10:42:21 +00:00
return fmt.Errorf("redis: expected '*', but got line %q", line)
2024-02-18 10:42:21 +00:00
}
return nil
2024-02-18 10:42:21 +00:00
}
func (c *baseClient) context(ctx context.Context) context.Context {
2024-02-18 10:42:21 +00:00
if c.opt.ContextTimeoutEnabled {
2024-02-18 10:42:21 +00:00
return ctx
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return context.Background()
2024-02-18 10:42:21 +00:00
}
//------------------------------------------------------------------------------
// Client is a Redis client representing a pool of zero or more underlying connections.
2024-02-18 10:42:21 +00:00
// It's safe for concurrent use by multiple goroutines.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// Client creates and frees connections automatically; it also maintains a free pool
2024-02-18 10:42:21 +00:00
// of idle connections. You can control the pool size with Config.PoolSize option.
2024-02-18 10:42:21 +00:00
type Client struct {
*baseClient
2024-02-18 10:42:21 +00:00
cmdable
2024-02-18 10:42:21 +00:00
hooksMixin
}
// NewClient returns a client to the Redis Server specified by Options.
2024-02-18 10:42:21 +00:00
func NewClient(opt *Options) *Client {
2024-02-18 10:42:21 +00:00
opt.init()
c := Client{
2024-02-18 10:42:21 +00:00
baseClient: &baseClient{
2024-02-18 10:42:21 +00:00
opt: opt,
},
}
2024-02-18 10:42:21 +00:00
c.init()
2024-02-18 10:42:21 +00:00
c.connPool = newConnPool(opt, c.dialHook)
return &c
2024-02-18 10:42:21 +00:00
}
func (c *Client) init() {
2024-02-18 10:42:21 +00:00
c.cmdable = c.Process
2024-02-18 10:42:21 +00:00
c.initHooks(hooks{
dial: c.baseClient.dial,
process: c.baseClient.process,
pipeline: c.baseClient.processPipeline,
2024-02-18 10:42:21 +00:00
txPipeline: c.baseClient.processTxPipeline,
})
2024-02-18 10:42:21 +00:00
}
func (c *Client) WithTimeout(timeout time.Duration) *Client {
2024-02-18 10:42:21 +00:00
clone := *c
2024-02-18 10:42:21 +00:00
clone.baseClient = c.baseClient.withTimeout(timeout)
2024-02-18 10:42:21 +00:00
clone.init()
2024-02-18 10:42:21 +00:00
return &clone
2024-02-18 10:42:21 +00:00
}
func (c *Client) Conn() *Conn {
2024-02-18 10:42:21 +00:00
return newConn(c.opt, pool.NewStickyConnPool(c.connPool))
2024-02-18 10:42:21 +00:00
}
// Do create a Cmd from the args and processes the cmd.
2024-02-18 10:42:21 +00:00
func (c *Client) Do(ctx context.Context, args ...interface{}) *Cmd {
2024-02-18 10:42:21 +00:00
cmd := NewCmd(ctx, args...)
2024-02-18 10:42:21 +00:00
_ = c.Process(ctx, cmd)
2024-02-18 10:42:21 +00:00
return cmd
2024-02-18 10:42:21 +00:00
}
func (c *Client) Process(ctx context.Context, cmd Cmder) error {
2024-02-18 10:42:21 +00:00
err := c.processHook(ctx, cmd)
2024-02-18 10:42:21 +00:00
cmd.SetErr(err)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// Options returns read-only Options that were used to create the client.
2024-02-18 10:42:21 +00:00
func (c *Client) Options() *Options {
2024-02-18 10:42:21 +00:00
return c.opt
2024-02-18 10:42:21 +00:00
}
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
2024-02-18 10:42:21 +00:00
func (c *Client) PoolStats() *PoolStats {
2024-02-18 10:42:21 +00:00
stats := c.connPool.Stats()
2024-02-18 10:42:21 +00:00
return (*PoolStats)(stats)
2024-02-18 10:42:21 +00:00
}
func (c *Client) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
2024-02-18 10:42:21 +00:00
return c.Pipeline().Pipelined(ctx, fn)
2024-02-18 10:42:21 +00:00
}
func (c *Client) Pipeline() Pipeliner {
2024-02-18 10:42:21 +00:00
pipe := Pipeline{
2024-02-18 10:42:21 +00:00
exec: pipelineExecer(c.processPipelineHook),
}
2024-02-18 10:42:21 +00:00
pipe.init()
2024-02-18 10:42:21 +00:00
return &pipe
2024-02-18 10:42:21 +00:00
}
func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
2024-02-18 10:42:21 +00:00
return c.TxPipeline().Pipelined(ctx, fn)
2024-02-18 10:42:21 +00:00
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
2024-02-18 10:42:21 +00:00
func (c *Client) TxPipeline() Pipeliner {
2024-02-18 10:42:21 +00:00
pipe := Pipeline{
2024-02-18 10:42:21 +00:00
exec: func(ctx context.Context, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
cmds = wrapMultiExec(ctx, cmds)
2024-02-18 10:42:21 +00:00
return c.processTxPipelineHook(ctx, cmds)
2024-02-18 10:42:21 +00:00
},
}
2024-02-18 10:42:21 +00:00
pipe.init()
2024-02-18 10:42:21 +00:00
return &pipe
2024-02-18 10:42:21 +00:00
}
func (c *Client) pubSub() *PubSub {
2024-02-18 10:42:21 +00:00
pubsub := &PubSub{
2024-02-18 10:42:21 +00:00
opt: c.opt,
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
2024-02-18 10:42:21 +00:00
return c.newConn(ctx)
2024-02-18 10:42:21 +00:00
},
2024-02-18 10:42:21 +00:00
closeConn: c.connPool.CloseConn,
}
2024-02-18 10:42:21 +00:00
pubsub.init()
2024-02-18 10:42:21 +00:00
return pubsub
2024-02-18 10:42:21 +00:00
}
// Subscribe subscribes the client to the specified channels.
2024-02-18 10:42:21 +00:00
// Channels can be omitted to create empty subscription.
2024-02-18 10:42:21 +00:00
// Note that this method does not wait on a response from Redis, so the
2024-02-18 10:42:21 +00:00
// subscription may not be active immediately. To force the connection to wait,
2024-02-18 10:42:21 +00:00
// you may call the Receive() method on the returned *PubSub like so:
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// sub := client.Subscribe(queryResp)
2024-02-18 10:42:21 +00:00
// iface, err := sub.Receive()
2024-02-18 10:42:21 +00:00
// if err != nil {
2024-02-18 10:42:21 +00:00
// // handle error
2024-02-18 10:42:21 +00:00
// }
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// // Should be *Subscription, but others are possible if other actions have been
2024-02-18 10:42:21 +00:00
// // taken on sub since it was created.
2024-02-18 10:42:21 +00:00
// switch iface.(type) {
2024-02-18 10:42:21 +00:00
// case *Subscription:
2024-02-18 10:42:21 +00:00
// // subscribe succeeded
2024-02-18 10:42:21 +00:00
// case *Message:
2024-02-18 10:42:21 +00:00
// // received first message
2024-02-18 10:42:21 +00:00
// case *Pong:
2024-02-18 10:42:21 +00:00
// // pong received
2024-02-18 10:42:21 +00:00
// default:
2024-02-18 10:42:21 +00:00
// // handle error
2024-02-18 10:42:21 +00:00
// }
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// ch := sub.Channel()
2024-02-18 10:42:21 +00:00
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub {
2024-02-18 10:42:21 +00:00
pubsub := c.pubSub()
2024-02-18 10:42:21 +00:00
if len(channels) > 0 {
2024-02-18 10:42:21 +00:00
_ = pubsub.Subscribe(ctx, channels...)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return pubsub
2024-02-18 10:42:21 +00:00
}
// PSubscribe subscribes the client to the given patterns.
2024-02-18 10:42:21 +00:00
// Patterns can be omitted to create empty subscription.
2024-02-18 10:42:21 +00:00
func (c *Client) PSubscribe(ctx context.Context, channels ...string) *PubSub {
2024-02-18 10:42:21 +00:00
pubsub := c.pubSub()
2024-02-18 10:42:21 +00:00
if len(channels) > 0 {
2024-02-18 10:42:21 +00:00
_ = pubsub.PSubscribe(ctx, channels...)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return pubsub
2024-02-18 10:42:21 +00:00
}
// SSubscribe Subscribes the client to the specified shard channels.
2024-02-18 10:42:21 +00:00
// Channels can be omitted to create empty subscription.
2024-02-18 10:42:21 +00:00
func (c *Client) SSubscribe(ctx context.Context, channels ...string) *PubSub {
2024-02-18 10:42:21 +00:00
pubsub := c.pubSub()
2024-02-18 10:42:21 +00:00
if len(channels) > 0 {
2024-02-18 10:42:21 +00:00
_ = pubsub.SSubscribe(ctx, channels...)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return pubsub
2024-02-18 10:42:21 +00:00
}
//------------------------------------------------------------------------------
// Conn represents a single Redis connection rather than a pool of connections.
2024-02-18 10:42:21 +00:00
// Prefer running commands from Client unless there is a specific need
2024-02-18 10:42:21 +00:00
// for a continuous single Redis connection.
2024-02-18 10:42:21 +00:00
type Conn struct {
baseClient
2024-02-18 10:42:21 +00:00
cmdable
2024-02-18 10:42:21 +00:00
statefulCmdable
2024-02-18 10:42:21 +00:00
hooksMixin
}
func newConn(opt *Options, connPool pool.Pooler) *Conn {
2024-02-18 10:42:21 +00:00
c := Conn{
2024-02-18 10:42:21 +00:00
baseClient: baseClient{
opt: opt,
2024-02-18 10:42:21 +00:00
connPool: connPool,
},
}
c.cmdable = c.Process
2024-02-18 10:42:21 +00:00
c.statefulCmdable = c.Process
2024-02-18 10:42:21 +00:00
c.initHooks(hooks{
dial: c.baseClient.dial,
process: c.baseClient.process,
pipeline: c.baseClient.processPipeline,
2024-02-18 10:42:21 +00:00
txPipeline: c.baseClient.processTxPipeline,
})
return &c
2024-02-18 10:42:21 +00:00
}
func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
2024-02-18 10:42:21 +00:00
err := c.processHook(ctx, cmd)
2024-02-18 10:42:21 +00:00
cmd.SetErr(err)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
2024-02-18 10:42:21 +00:00
return c.Pipeline().Pipelined(ctx, fn)
2024-02-18 10:42:21 +00:00
}
func (c *Conn) Pipeline() Pipeliner {
2024-02-18 10:42:21 +00:00
pipe := Pipeline{
2024-02-18 10:42:21 +00:00
exec: c.processPipelineHook,
}
2024-02-18 10:42:21 +00:00
pipe.init()
2024-02-18 10:42:21 +00:00
return &pipe
2024-02-18 10:42:21 +00:00
}
func (c *Conn) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
2024-02-18 10:42:21 +00:00
return c.TxPipeline().Pipelined(ctx, fn)
2024-02-18 10:42:21 +00:00
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
2024-02-18 10:42:21 +00:00
func (c *Conn) TxPipeline() Pipeliner {
2024-02-18 10:42:21 +00:00
pipe := Pipeline{
2024-02-18 10:42:21 +00:00
exec: func(ctx context.Context, cmds []Cmder) error {
2024-02-18 10:42:21 +00:00
cmds = wrapMultiExec(ctx, cmds)
2024-02-18 10:42:21 +00:00
return c.processTxPipelineHook(ctx, cmds)
2024-02-18 10:42:21 +00:00
},
}
2024-02-18 10:42:21 +00:00
pipe.init()
2024-02-18 10:42:21 +00:00
return &pipe
2024-02-18 10:42:21 +00:00
}