forked from ebhomengo/niki
1
0
Fork 0
niki/vendor/github.com/redis/go-redis/v9/internal/pool/pool.go

813 lines
9.6 KiB
Go
Raw Normal View History

2024-02-18 10:42:21 +00:00
package pool
import (
"context"
"errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9/internal"
)
var (
2024-02-18 10:42:21 +00:00
// ErrClosed performs any operation on the closed client will return this error.
2024-02-18 10:42:21 +00:00
ErrClosed = errors.New("redis: client is closed")
// ErrPoolExhausted is returned from a pool connection method
2024-02-18 10:42:21 +00:00
// when the maximum number of database connections in the pool has been reached.
2024-02-18 10:42:21 +00:00
ErrPoolExhausted = errors.New("redis: connection pool exhausted")
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
2024-02-18 10:42:21 +00:00
ErrPoolTimeout = errors.New("redis: connection pool timeout")
)
var timers = sync.Pool{
2024-02-18 10:42:21 +00:00
New: func() interface{} {
2024-02-18 10:42:21 +00:00
t := time.NewTimer(time.Hour)
2024-02-18 10:42:21 +00:00
t.Stop()
2024-02-18 10:42:21 +00:00
return t
2024-02-18 10:42:21 +00:00
},
}
// Stats contains pool state information and accumulated stats.
2024-02-18 10:42:21 +00:00
type Stats struct {
Hits uint32 // number of times free connection was found in the pool
Misses uint32 // number of times free connection was NOT found in the pool
2024-02-18 10:42:21 +00:00
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
IdleConns uint32 // number of idle connections in the pool
2024-02-18 10:42:21 +00:00
StaleConns uint32 // number of stale connections removed from the pool
2024-02-18 10:42:21 +00:00
}
type Pooler interface {
NewConn(context.Context) (*Conn, error)
2024-02-18 10:42:21 +00:00
CloseConn(*Conn) error
Get(context.Context) (*Conn, error)
2024-02-18 10:42:21 +00:00
Put(context.Context, *Conn)
2024-02-18 10:42:21 +00:00
Remove(context.Context, *Conn, error)
Len() int
2024-02-18 10:42:21 +00:00
IdleLen() int
2024-02-18 10:42:21 +00:00
Stats() *Stats
Close() error
}
type Options struct {
Dialer func(context.Context) (net.Conn, error)
PoolFIFO bool
PoolSize int
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
MaxActiveConns int
2024-02-18 10:42:21 +00:00
ConnMaxIdleTime time.Duration
2024-02-18 10:42:21 +00:00
ConnMaxLifetime time.Duration
}
type lastDialErrorWrap struct {
err error
}
type ConnPool struct {
cfg *Options
dialErrorsNum uint32 // atomic
2024-02-18 10:42:21 +00:00
lastDialError atomic.Value
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
2024-02-18 10:42:21 +00:00
idleConns []*Conn
poolSize int
2024-02-18 10:42:21 +00:00
idleConnsLen int
stats Stats
_closed uint32 // atomic
2024-02-18 10:42:21 +00:00
}
var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool {
2024-02-18 10:42:21 +00:00
p := &ConnPool{
2024-02-18 10:42:21 +00:00
cfg: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
2024-02-18 10:42:21 +00:00
idleConns: make([]*Conn, 0, opt.PoolSize),
}
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
p.checkMinIdleConns()
2024-02-18 10:42:21 +00:00
p.connsMu.Unlock()
return p
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) checkMinIdleConns() {
2024-02-18 10:42:21 +00:00
if p.cfg.MinIdleConns == 0 {
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns {
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case p.queue <- struct{}{}:
2024-02-18 10:42:21 +00:00
p.poolSize++
2024-02-18 10:42:21 +00:00
p.idleConnsLen++
go func() {
2024-02-18 10:42:21 +00:00
err := p.addIdleConn()
2024-02-18 10:42:21 +00:00
if err != nil && err != ErrClosed {
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
p.poolSize--
2024-02-18 10:42:21 +00:00
p.idleConnsLen--
2024-02-18 10:42:21 +00:00
p.connsMu.Unlock()
2024-02-18 10:42:21 +00:00
}
p.freeTurn()
2024-02-18 10:42:21 +00:00
}()
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) addIdleConn() error {
2024-02-18 10:42:21 +00:00
cn, err := p.dialConn(context.TODO(), true)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
defer p.connsMu.Unlock()
// It is not allowed to add new connections to the closed connection pool.
2024-02-18 10:42:21 +00:00
if p.closed() {
2024-02-18 10:42:21 +00:00
_ = cn.Close()
2024-02-18 10:42:21 +00:00
return ErrClosed
2024-02-18 10:42:21 +00:00
}
p.conns = append(p.conns, cn)
2024-02-18 10:42:21 +00:00
p.idleConns = append(p.idleConns, cn)
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error) {
2024-02-18 10:42:21 +00:00
return p.newConn(ctx, false)
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
2024-02-18 10:42:21 +00:00
if p.closed() {
2024-02-18 10:42:21 +00:00
return nil, ErrClosed
2024-02-18 10:42:21 +00:00
}
if p.cfg.MaxActiveConns > 0 && p.poolSize >= p.cfg.MaxActiveConns {
2024-02-18 10:42:21 +00:00
return nil, ErrPoolExhausted
2024-02-18 10:42:21 +00:00
}
cn, err := p.dialConn(ctx, pooled)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
defer p.connsMu.Unlock()
p.conns = append(p.conns, cn)
2024-02-18 10:42:21 +00:00
if pooled {
2024-02-18 10:42:21 +00:00
// If pool is full remove the cn on next Put.
2024-02-18 10:42:21 +00:00
if p.poolSize >= p.cfg.PoolSize {
2024-02-18 10:42:21 +00:00
cn.pooled = false
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
p.poolSize++
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
2024-02-18 10:42:21 +00:00
if p.closed() {
2024-02-18 10:42:21 +00:00
return nil, ErrClosed
2024-02-18 10:42:21 +00:00
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.cfg.PoolSize) {
2024-02-18 10:42:21 +00:00
return nil, p.getLastDialError()
2024-02-18 10:42:21 +00:00
}
netConn, err := p.cfg.Dialer(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
p.setLastDialError(err)
2024-02-18 10:42:21 +00:00
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
2024-02-18 10:42:21 +00:00
go p.tryDial()
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
cn := NewConn(netConn)
2024-02-18 10:42:21 +00:00
cn.pooled = pooled
2024-02-18 10:42:21 +00:00
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) tryDial() {
2024-02-18 10:42:21 +00:00
for {
2024-02-18 10:42:21 +00:00
if p.closed() {
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
conn, err := p.cfg.Dialer(context.Background())
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
p.setLastDialError(err)
2024-02-18 10:42:21 +00:00
time.Sleep(time.Second)
2024-02-18 10:42:21 +00:00
continue
2024-02-18 10:42:21 +00:00
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
2024-02-18 10:42:21 +00:00
_ = conn.Close()
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) setLastDialError(err error) {
2024-02-18 10:42:21 +00:00
p.lastDialError.Store(&lastDialErrorWrap{err: err})
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) getLastDialError() error {
2024-02-18 10:42:21 +00:00
err, _ := p.lastDialError.Load().(*lastDialErrorWrap)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return err.err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
}
// Get returns existed connection from the pool or creates a new one.
2024-02-18 10:42:21 +00:00
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
2024-02-18 10:42:21 +00:00
if p.closed() {
2024-02-18 10:42:21 +00:00
return nil, ErrClosed
2024-02-18 10:42:21 +00:00
}
if err := p.waitTurn(ctx); err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
for {
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
cn, err := p.popIdle()
2024-02-18 10:42:21 +00:00
p.connsMu.Unlock()
if err != nil {
2024-02-18 10:42:21 +00:00
p.freeTurn()
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
if cn == nil {
2024-02-18 10:42:21 +00:00
break
2024-02-18 10:42:21 +00:00
}
if !p.isHealthyConn(cn) {
2024-02-18 10:42:21 +00:00
_ = p.CloseConn(cn)
2024-02-18 10:42:21 +00:00
continue
2024-02-18 10:42:21 +00:00
}
atomic.AddUint32(&p.stats.Hits, 1)
2024-02-18 10:42:21 +00:00
return cn, nil
2024-02-18 10:42:21 +00:00
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.newConn(ctx, true)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
p.freeTurn()
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
return newcn, nil
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case <-ctx.Done():
2024-02-18 10:42:21 +00:00
return ctx.Err()
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
}
select {
2024-02-18 10:42:21 +00:00
case p.queue <- struct{}{}:
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
}
timer := timers.Get().(*time.Timer)
2024-02-18 10:42:21 +00:00
timer.Reset(p.cfg.PoolTimeout)
select {
2024-02-18 10:42:21 +00:00
case <-ctx.Done():
2024-02-18 10:42:21 +00:00
if !timer.Stop() {
2024-02-18 10:42:21 +00:00
<-timer.C
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
timers.Put(timer)
2024-02-18 10:42:21 +00:00
return ctx.Err()
2024-02-18 10:42:21 +00:00
case p.queue <- struct{}{}:
2024-02-18 10:42:21 +00:00
if !timer.Stop() {
2024-02-18 10:42:21 +00:00
<-timer.C
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
timers.Put(timer)
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
case <-timer.C:
2024-02-18 10:42:21 +00:00
timers.Put(timer)
2024-02-18 10:42:21 +00:00
atomic.AddUint32(&p.stats.Timeouts, 1)
2024-02-18 10:42:21 +00:00
return ErrPoolTimeout
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) freeTurn() {
2024-02-18 10:42:21 +00:00
<-p.queue
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) popIdle() (*Conn, error) {
2024-02-18 10:42:21 +00:00
if p.closed() {
2024-02-18 10:42:21 +00:00
return nil, ErrClosed
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
n := len(p.idleConns)
2024-02-18 10:42:21 +00:00
if n == 0 {
2024-02-18 10:42:21 +00:00
return nil, nil
2024-02-18 10:42:21 +00:00
}
var cn *Conn
2024-02-18 10:42:21 +00:00
if p.cfg.PoolFIFO {
2024-02-18 10:42:21 +00:00
cn = p.idleConns[0]
2024-02-18 10:42:21 +00:00
copy(p.idleConns, p.idleConns[1:])
2024-02-18 10:42:21 +00:00
p.idleConns = p.idleConns[:n-1]
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
idx := n - 1
2024-02-18 10:42:21 +00:00
cn = p.idleConns[idx]
2024-02-18 10:42:21 +00:00
p.idleConns = p.idleConns[:idx]
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
p.idleConnsLen--
2024-02-18 10:42:21 +00:00
p.checkMinIdleConns()
2024-02-18 10:42:21 +00:00
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
2024-02-18 10:42:21 +00:00
if cn.rd.Buffered() > 0 {
2024-02-18 10:42:21 +00:00
internal.Logger.Printf(ctx, "Conn has unread data")
2024-02-18 10:42:21 +00:00
p.Remove(ctx, cn, BadConnError{})
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
if !cn.pooled {
2024-02-18 10:42:21 +00:00
p.Remove(ctx, cn, nil)
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
var shouldCloseConn bool
p.connsMu.Lock()
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
2024-02-18 10:42:21 +00:00
p.idleConns = append(p.idleConns, cn)
2024-02-18 10:42:21 +00:00
p.idleConnsLen++
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
p.removeConn(cn)
2024-02-18 10:42:21 +00:00
shouldCloseConn = true
2024-02-18 10:42:21 +00:00
}
p.connsMu.Unlock()
p.freeTurn()
if shouldCloseConn {
2024-02-18 10:42:21 +00:00
_ = p.closeConn(cn)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) Remove(_ context.Context, cn *Conn, reason error) {
2024-02-18 10:42:21 +00:00
p.removeConnWithLock(cn)
2024-02-18 10:42:21 +00:00
p.freeTurn()
2024-02-18 10:42:21 +00:00
_ = p.closeConn(cn)
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) CloseConn(cn *Conn) error {
2024-02-18 10:42:21 +00:00
p.removeConnWithLock(cn)
2024-02-18 10:42:21 +00:00
return p.closeConn(cn)
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) removeConnWithLock(cn *Conn) {
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
defer p.connsMu.Unlock()
2024-02-18 10:42:21 +00:00
p.removeConn(cn)
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) removeConn(cn *Conn) {
2024-02-18 10:42:21 +00:00
for i, c := range p.conns {
2024-02-18 10:42:21 +00:00
if c == cn {
2024-02-18 10:42:21 +00:00
p.conns = append(p.conns[:i], p.conns[i+1:]...)
2024-02-18 10:42:21 +00:00
if cn.pooled {
2024-02-18 10:42:21 +00:00
p.poolSize--
2024-02-18 10:42:21 +00:00
p.checkMinIdleConns()
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
break
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
atomic.AddUint32(&p.stats.StaleConns, 1)
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) closeConn(cn *Conn) error {
2024-02-18 10:42:21 +00:00
return cn.Close()
2024-02-18 10:42:21 +00:00
}
// Len returns total number of connections.
2024-02-18 10:42:21 +00:00
func (p *ConnPool) Len() int {
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
n := len(p.conns)
2024-02-18 10:42:21 +00:00
p.connsMu.Unlock()
2024-02-18 10:42:21 +00:00
return n
2024-02-18 10:42:21 +00:00
}
// IdleLen returns number of idle connections.
2024-02-18 10:42:21 +00:00
func (p *ConnPool) IdleLen() int {
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
n := p.idleConnsLen
2024-02-18 10:42:21 +00:00
p.connsMu.Unlock()
2024-02-18 10:42:21 +00:00
return n
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) Stats() *Stats {
2024-02-18 10:42:21 +00:00
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
2024-02-18 10:42:21 +00:00
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
IdleConns: uint32(p.IdleLen()),
2024-02-18 10:42:21 +00:00
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) closed() bool {
2024-02-18 10:42:21 +00:00
return atomic.LoadUint32(&p._closed) == 1
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
defer p.connsMu.Unlock()
var firstErr error
2024-02-18 10:42:21 +00:00
for _, cn := range p.conns {
2024-02-18 10:42:21 +00:00
if fn(cn) {
2024-02-18 10:42:21 +00:00
if err := p.closeConn(cn); err != nil && firstErr == nil {
2024-02-18 10:42:21 +00:00
firstErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return firstErr
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) Close() error {
2024-02-18 10:42:21 +00:00
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
2024-02-18 10:42:21 +00:00
return ErrClosed
2024-02-18 10:42:21 +00:00
}
var firstErr error
2024-02-18 10:42:21 +00:00
p.connsMu.Lock()
2024-02-18 10:42:21 +00:00
for _, cn := range p.conns {
2024-02-18 10:42:21 +00:00
if err := p.closeConn(cn); err != nil && firstErr == nil {
2024-02-18 10:42:21 +00:00
firstErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
p.conns = nil
2024-02-18 10:42:21 +00:00
p.poolSize = 0
2024-02-18 10:42:21 +00:00
p.idleConns = nil
2024-02-18 10:42:21 +00:00
p.idleConnsLen = 0
2024-02-18 10:42:21 +00:00
p.connsMu.Unlock()
return firstErr
2024-02-18 10:42:21 +00:00
}
func (p *ConnPool) isHealthyConn(cn *Conn) bool {
2024-02-18 10:42:21 +00:00
now := time.Now()
if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime {
2024-02-18 10:42:21 +00:00
return false
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {
2024-02-18 10:42:21 +00:00
return false
2024-02-18 10:42:21 +00:00
}
if connCheck(cn.netConn) != nil {
2024-02-18 10:42:21 +00:00
return false
2024-02-18 10:42:21 +00:00
}
cn.SetUsedAt(now)
2024-02-18 10:42:21 +00:00
return true
2024-02-18 10:42:21 +00:00
}