forked from ebhomengo/niki
1
0
Fork 0
niki/vendor/github.com/redis/go-redis/v9/pubsub.go

1238 lines
17 KiB
Go
Raw Normal View History

2024-02-18 10:42:21 +00:00
package redis
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
// PubSub implements Pub/Sub commands as described in
2024-02-18 10:42:21 +00:00
// http://redis.io/topics/pubsub. Message receiving is NOT safe
2024-02-18 10:42:21 +00:00
// for concurrent use by multiple goroutines.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// PubSub automatically reconnects to Redis Server and resubscribes
2024-02-18 10:42:21 +00:00
// to the channels in case of network errors.
2024-02-18 10:42:21 +00:00
type PubSub struct {
opt *Options
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
2024-02-18 10:42:21 +00:00
closeConn func(*pool.Conn) error
mu sync.Mutex
cn *pool.Conn
channels map[string]struct{}
patterns map[string]struct{}
2024-02-18 10:42:21 +00:00
schannels map[string]struct{}
closed bool
exit chan struct{}
2024-02-18 10:42:21 +00:00
cmd *Cmd
chOnce sync.Once
msgCh *channel
allCh *channel
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) init() {
2024-02-18 10:42:21 +00:00
c.exit = make(chan struct{})
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) String() string {
2024-02-18 10:42:21 +00:00
channels := mapKeys(c.channels)
2024-02-18 10:42:21 +00:00
channels = append(channels, mapKeys(c.patterns)...)
2024-02-18 10:42:21 +00:00
channels = append(channels, mapKeys(c.schannels)...)
2024-02-18 10:42:21 +00:00
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
cn, err := c.conn(ctx, nil)
2024-02-18 10:42:21 +00:00
c.mu.Unlock()
2024-02-18 10:42:21 +00:00
return cn, err
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, error) {
2024-02-18 10:42:21 +00:00
if c.closed {
2024-02-18 10:42:21 +00:00
return nil, pool.ErrClosed
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if c.cn != nil {
2024-02-18 10:42:21 +00:00
return c.cn, nil
2024-02-18 10:42:21 +00:00
}
channels := mapKeys(c.channels)
2024-02-18 10:42:21 +00:00
channels = append(channels, newChannels...)
cn, err := c.newConn(ctx, channels)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
if err := c.resubscribe(ctx, cn); err != nil {
2024-02-18 10:42:21 +00:00
_ = c.closeConn(cn)
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
c.cn = cn
2024-02-18 10:42:21 +00:00
return cn, nil
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
2024-02-18 10:42:21 +00:00
return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error {
2024-02-18 10:42:21 +00:00
return writeCmd(wr, cmd)
2024-02-18 10:42:21 +00:00
})
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) resubscribe(ctx context.Context, cn *pool.Conn) error {
2024-02-18 10:42:21 +00:00
var firstErr error
if len(c.channels) > 0 {
2024-02-18 10:42:21 +00:00
firstErr = c._subscribe(ctx, cn, "subscribe", mapKeys(c.channels))
2024-02-18 10:42:21 +00:00
}
if len(c.patterns) > 0 {
2024-02-18 10:42:21 +00:00
err := c._subscribe(ctx, cn, "psubscribe", mapKeys(c.patterns))
2024-02-18 10:42:21 +00:00
if err != nil && firstErr == nil {
2024-02-18 10:42:21 +00:00
firstErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
if len(c.schannels) > 0 {
2024-02-18 10:42:21 +00:00
err := c._subscribe(ctx, cn, "ssubscribe", mapKeys(c.schannels))
2024-02-18 10:42:21 +00:00
if err != nil && firstErr == nil {
2024-02-18 10:42:21 +00:00
firstErr = err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
return firstErr
2024-02-18 10:42:21 +00:00
}
func mapKeys(m map[string]struct{}) []string {
2024-02-18 10:42:21 +00:00
s := make([]string, len(m))
2024-02-18 10:42:21 +00:00
i := 0
2024-02-18 10:42:21 +00:00
for k := range m {
2024-02-18 10:42:21 +00:00
s[i] = k
2024-02-18 10:42:21 +00:00
i++
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return s
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) _subscribe(
2024-02-18 10:42:21 +00:00
ctx context.Context, cn *pool.Conn, redisCmd string, channels []string,
2024-02-18 10:42:21 +00:00
) error {
2024-02-18 10:42:21 +00:00
args := make([]interface{}, 0, 1+len(channels))
2024-02-18 10:42:21 +00:00
args = append(args, redisCmd)
2024-02-18 10:42:21 +00:00
for _, channel := range channels {
2024-02-18 10:42:21 +00:00
args = append(args, channel)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
cmd := NewSliceCmd(ctx, args...)
2024-02-18 10:42:21 +00:00
return c.writeCmd(ctx, cn, cmd)
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) releaseConnWithLock(
2024-02-18 10:42:21 +00:00
ctx context.Context,
2024-02-18 10:42:21 +00:00
cn *pool.Conn,
2024-02-18 10:42:21 +00:00
err error,
2024-02-18 10:42:21 +00:00
allowTimeout bool,
2024-02-18 10:42:21 +00:00
) {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
c.releaseConn(ctx, cn, err, allowTimeout)
2024-02-18 10:42:21 +00:00
c.mu.Unlock()
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allowTimeout bool) {
2024-02-18 10:42:21 +00:00
if c.cn != cn {
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if isBadConn(err, allowTimeout, c.opt.Addr) {
2024-02-18 10:42:21 +00:00
c.reconnect(ctx, err)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) reconnect(ctx context.Context, reason error) {
2024-02-18 10:42:21 +00:00
_ = c.closeTheCn(reason)
2024-02-18 10:42:21 +00:00
_, _ = c.conn(ctx, nil)
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) closeTheCn(reason error) error {
2024-02-18 10:42:21 +00:00
if c.cn == nil {
2024-02-18 10:42:21 +00:00
return nil
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if !c.closed {
2024-02-18 10:42:21 +00:00
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
err := c.closeConn(c.cn)
2024-02-18 10:42:21 +00:00
c.cn = nil
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) Close() error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
if c.closed {
2024-02-18 10:42:21 +00:00
return pool.ErrClosed
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
c.closed = true
2024-02-18 10:42:21 +00:00
close(c.exit)
return c.closeTheCn(pool.ErrClosed)
2024-02-18 10:42:21 +00:00
}
// Subscribe the client to the specified channels. It returns
2024-02-18 10:42:21 +00:00
// empty subscription if there are no channels.
2024-02-18 10:42:21 +00:00
func (c *PubSub) Subscribe(ctx context.Context, channels ...string) error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
err := c.subscribe(ctx, "subscribe", channels...)
2024-02-18 10:42:21 +00:00
if c.channels == nil {
2024-02-18 10:42:21 +00:00
c.channels = make(map[string]struct{})
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
for _, s := range channels {
2024-02-18 10:42:21 +00:00
c.channels[s] = struct{}{}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// PSubscribe the client to the given patterns. It returns
2024-02-18 10:42:21 +00:00
// empty subscription if there are no patterns.
2024-02-18 10:42:21 +00:00
func (c *PubSub) PSubscribe(ctx context.Context, patterns ...string) error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
err := c.subscribe(ctx, "psubscribe", patterns...)
2024-02-18 10:42:21 +00:00
if c.patterns == nil {
2024-02-18 10:42:21 +00:00
c.patterns = make(map[string]struct{})
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
for _, s := range patterns {
2024-02-18 10:42:21 +00:00
c.patterns[s] = struct{}{}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// SSubscribe Subscribes the client to the specified shard channels.
2024-02-18 10:42:21 +00:00
func (c *PubSub) SSubscribe(ctx context.Context, channels ...string) error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
err := c.subscribe(ctx, "ssubscribe", channels...)
2024-02-18 10:42:21 +00:00
if c.schannels == nil {
2024-02-18 10:42:21 +00:00
c.schannels = make(map[string]struct{})
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
for _, s := range channels {
2024-02-18 10:42:21 +00:00
c.schannels[s] = struct{}{}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// Unsubscribe the client from the given channels, or from all of
2024-02-18 10:42:21 +00:00
// them if none is given.
2024-02-18 10:42:21 +00:00
func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
if len(channels) > 0 {
2024-02-18 10:42:21 +00:00
for _, channel := range channels {
2024-02-18 10:42:21 +00:00
delete(c.channels, channel)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
// Unsubscribe from all channels.
2024-02-18 10:42:21 +00:00
for channel := range c.channels {
2024-02-18 10:42:21 +00:00
delete(c.channels, channel)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
err := c.subscribe(ctx, "unsubscribe", channels...)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// PUnsubscribe the client from the given patterns, or from all of
2024-02-18 10:42:21 +00:00
// them if none is given.
2024-02-18 10:42:21 +00:00
func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
if len(patterns) > 0 {
2024-02-18 10:42:21 +00:00
for _, pattern := range patterns {
2024-02-18 10:42:21 +00:00
delete(c.patterns, pattern)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
// Unsubscribe from all patterns.
2024-02-18 10:42:21 +00:00
for pattern := range c.patterns {
2024-02-18 10:42:21 +00:00
delete(c.patterns, pattern)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
err := c.subscribe(ctx, "punsubscribe", patterns...)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// SUnsubscribe unsubscribes the client from the given shard channels,
2024-02-18 10:42:21 +00:00
// or from all of them if none is given.
2024-02-18 10:42:21 +00:00
func (c *PubSub) SUnsubscribe(ctx context.Context, channels ...string) error {
2024-02-18 10:42:21 +00:00
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
if len(channels) > 0 {
2024-02-18 10:42:21 +00:00
for _, channel := range channels {
2024-02-18 10:42:21 +00:00
delete(c.schannels, channel)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
} else {
2024-02-18 10:42:21 +00:00
// Unsubscribe from all channels.
2024-02-18 10:42:21 +00:00
for channel := range c.schannels {
2024-02-18 10:42:21 +00:00
delete(c.schannels, channel)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
err := c.subscribe(ctx, "sunsubscribe", channels...)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) subscribe(ctx context.Context, redisCmd string, channels ...string) error {
2024-02-18 10:42:21 +00:00
cn, err := c.conn(ctx, channels)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
err = c._subscribe(ctx, cn, redisCmd, channels)
2024-02-18 10:42:21 +00:00
c.releaseConn(ctx, cn, err, false)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) Ping(ctx context.Context, payload ...string) error {
2024-02-18 10:42:21 +00:00
args := []interface{}{"ping"}
2024-02-18 10:42:21 +00:00
if len(payload) == 1 {
2024-02-18 10:42:21 +00:00
args = append(args, payload[0])
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
cmd := NewCmd(ctx, args...)
c.mu.Lock()
2024-02-18 10:42:21 +00:00
defer c.mu.Unlock()
cn, err := c.conn(ctx, nil)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
err = c.writeCmd(ctx, cn, cmd)
2024-02-18 10:42:21 +00:00
c.releaseConn(ctx, cn, err, false)
2024-02-18 10:42:21 +00:00
return err
2024-02-18 10:42:21 +00:00
}
// Subscription received after a successful subscription to channel.
2024-02-18 10:42:21 +00:00
type Subscription struct {
2024-02-18 10:42:21 +00:00
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
2024-02-18 10:42:21 +00:00
Kind string
2024-02-18 10:42:21 +00:00
// Channel name we have subscribed to.
2024-02-18 10:42:21 +00:00
Channel string
2024-02-18 10:42:21 +00:00
// Number of channels we are currently subscribed to.
2024-02-18 10:42:21 +00:00
Count int
}
func (m *Subscription) String() string {
2024-02-18 10:42:21 +00:00
return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
2024-02-18 10:42:21 +00:00
}
// Message received as result of a PUBLISH command issued by another client.
2024-02-18 10:42:21 +00:00
type Message struct {
Channel string
Pattern string
Payload string
2024-02-18 10:42:21 +00:00
PayloadSlice []string
}
func (m *Message) String() string {
2024-02-18 10:42:21 +00:00
return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
2024-02-18 10:42:21 +00:00
}
// Pong received as result of a PING command issued by another client.
2024-02-18 10:42:21 +00:00
type Pong struct {
Payload string
}
func (p *Pong) String() string {
2024-02-18 10:42:21 +00:00
if p.Payload != "" {
2024-02-18 10:42:21 +00:00
return fmt.Sprintf("Pong<%s>", p.Payload)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return "Pong"
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
2024-02-18 10:42:21 +00:00
switch reply := reply.(type) {
2024-02-18 10:42:21 +00:00
case string:
2024-02-18 10:42:21 +00:00
return &Pong{
2024-02-18 10:42:21 +00:00
Payload: reply,
}, nil
2024-02-18 10:42:21 +00:00
case []interface{}:
2024-02-18 10:42:21 +00:00
switch kind := reply[0].(string); kind {
2024-02-18 10:42:21 +00:00
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "ssubscribe", "sunsubscribe":
2024-02-18 10:42:21 +00:00
// Can be nil in case of "unsubscribe".
2024-02-18 10:42:21 +00:00
channel, _ := reply[1].(string)
2024-02-18 10:42:21 +00:00
return &Subscription{
Kind: kind,
2024-02-18 10:42:21 +00:00
Channel: channel,
Count: int(reply[2].(int64)),
2024-02-18 10:42:21 +00:00
}, nil
2024-02-18 10:42:21 +00:00
case "message", "smessage":
2024-02-18 10:42:21 +00:00
switch payload := reply[2].(type) {
2024-02-18 10:42:21 +00:00
case string:
2024-02-18 10:42:21 +00:00
return &Message{
2024-02-18 10:42:21 +00:00
Channel: reply[1].(string),
2024-02-18 10:42:21 +00:00
Payload: payload,
}, nil
2024-02-18 10:42:21 +00:00
case []interface{}:
2024-02-18 10:42:21 +00:00
ss := make([]string, len(payload))
2024-02-18 10:42:21 +00:00
for i, s := range payload {
2024-02-18 10:42:21 +00:00
ss[i] = s.(string)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return &Message{
Channel: reply[1].(string),
2024-02-18 10:42:21 +00:00
PayloadSlice: ss,
}, nil
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
case "pmessage":
2024-02-18 10:42:21 +00:00
return &Message{
2024-02-18 10:42:21 +00:00
Pattern: reply[1].(string),
2024-02-18 10:42:21 +00:00
Channel: reply[2].(string),
2024-02-18 10:42:21 +00:00
Payload: reply[3].(string),
}, nil
2024-02-18 10:42:21 +00:00
case "pong":
2024-02-18 10:42:21 +00:00
return &Pong{
2024-02-18 10:42:21 +00:00
Payload: reply[1].(string),
}, nil
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
// ReceiveTimeout acts like Receive but returns an error if message
2024-02-18 10:42:21 +00:00
// is not received in time. This is low-level API and in most cases
2024-02-18 10:42:21 +00:00
// Channel should be used instead.
2024-02-18 10:42:21 +00:00
func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error) {
2024-02-18 10:42:21 +00:00
if c.cmd == nil {
2024-02-18 10:42:21 +00:00
c.cmd = NewCmd(ctx)
2024-02-18 10:42:21 +00:00
}
// Don't hold the lock to allow subscriptions and pings.
cn, err := c.connWithLock(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
2024-02-18 10:42:21 +00:00
return c.cmd.readReply(rd)
2024-02-18 10:42:21 +00:00
})
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
return c.newMessage(c.cmd.Val())
2024-02-18 10:42:21 +00:00
}
// Receive returns a message as a Subscription, Message, Pong or error.
2024-02-18 10:42:21 +00:00
// See PubSub example for details. This is low-level API and in most cases
2024-02-18 10:42:21 +00:00
// Channel should be used instead.
2024-02-18 10:42:21 +00:00
func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
2024-02-18 10:42:21 +00:00
return c.ReceiveTimeout(ctx, 0)
2024-02-18 10:42:21 +00:00
}
// ReceiveMessage returns a Message or error ignoring Subscription and Pong
2024-02-18 10:42:21 +00:00
// messages. This is low-level API and in most cases Channel should be used
2024-02-18 10:42:21 +00:00
// instead.
2024-02-18 10:42:21 +00:00
func (c *PubSub) ReceiveMessage(ctx context.Context) (*Message, error) {
2024-02-18 10:42:21 +00:00
for {
2024-02-18 10:42:21 +00:00
msg, err := c.Receive(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
switch msg := msg.(type) {
2024-02-18 10:42:21 +00:00
case *Subscription:
2024-02-18 10:42:21 +00:00
// Ignore.
2024-02-18 10:42:21 +00:00
case *Pong:
2024-02-18 10:42:21 +00:00
// Ignore.
2024-02-18 10:42:21 +00:00
case *Message:
2024-02-18 10:42:21 +00:00
return msg, nil
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
err := fmt.Errorf("redis: unknown message: %T", msg)
2024-02-18 10:42:21 +00:00
return nil, err
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
func (c *PubSub) getContext() context.Context {
2024-02-18 10:42:21 +00:00
if c.cmd != nil {
2024-02-18 10:42:21 +00:00
return c.cmd.ctx
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return context.Background()
2024-02-18 10:42:21 +00:00
}
//------------------------------------------------------------------------------
// Channel returns a Go channel for concurrently receiving messages.
2024-02-18 10:42:21 +00:00
// The channel is closed together with the PubSub. If the Go channel
2024-02-18 10:42:21 +00:00
// is blocked full for 1 minute the message is dropped.
2024-02-18 10:42:21 +00:00
// Receive* APIs can not be used after channel is created.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// go-redis periodically sends ping messages to test connection health
2024-02-18 10:42:21 +00:00
// and re-subscribes if ping can not not received for 1 minute.
2024-02-18 10:42:21 +00:00
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
2024-02-18 10:42:21 +00:00
c.chOnce.Do(func() {
2024-02-18 10:42:21 +00:00
c.msgCh = newChannel(c, opts...)
2024-02-18 10:42:21 +00:00
c.msgCh.initMsgChan()
2024-02-18 10:42:21 +00:00
})
2024-02-18 10:42:21 +00:00
if c.msgCh == nil {
2024-02-18 10:42:21 +00:00
err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
2024-02-18 10:42:21 +00:00
panic(err)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return c.msgCh.msgCh
2024-02-18 10:42:21 +00:00
}
// ChannelSize is like Channel, but creates a Go channel
2024-02-18 10:42:21 +00:00
// with specified buffer size.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// Deprecated: use Channel(WithChannelSize(size)), remove in v9.
2024-02-18 10:42:21 +00:00
func (c *PubSub) ChannelSize(size int) <-chan *Message {
2024-02-18 10:42:21 +00:00
return c.Channel(WithChannelSize(size))
2024-02-18 10:42:21 +00:00
}
// ChannelWithSubscriptions is like Channel, but message type can be either
2024-02-18 10:42:21 +00:00
// *Subscription or *Message. Subscription messages can be used to detect
2024-02-18 10:42:21 +00:00
// reconnections.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
2024-02-18 10:42:21 +00:00
func (c *PubSub) ChannelWithSubscriptions(opts ...ChannelOption) <-chan interface{} {
2024-02-18 10:42:21 +00:00
c.chOnce.Do(func() {
2024-02-18 10:42:21 +00:00
c.allCh = newChannel(c, opts...)
2024-02-18 10:42:21 +00:00
c.allCh.initAllChan()
2024-02-18 10:42:21 +00:00
})
2024-02-18 10:42:21 +00:00
if c.allCh == nil {
2024-02-18 10:42:21 +00:00
err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
2024-02-18 10:42:21 +00:00
panic(err)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return c.allCh.allCh
2024-02-18 10:42:21 +00:00
}
type ChannelOption func(c *channel)
// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// The default is 100 messages.
2024-02-18 10:42:21 +00:00
func WithChannelSize(size int) ChannelOption {
2024-02-18 10:42:21 +00:00
return func(c *channel) {
2024-02-18 10:42:21 +00:00
c.chanSize = size
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
// WithChannelHealthCheckInterval specifies the health check interval.
2024-02-18 10:42:21 +00:00
// PubSub will ping Redis Server if it does not receive any messages within the interval.
2024-02-18 10:42:21 +00:00
// To disable health check, use zero interval.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// The default is 3 seconds.
2024-02-18 10:42:21 +00:00
func WithChannelHealthCheckInterval(d time.Duration) ChannelOption {
2024-02-18 10:42:21 +00:00
return func(c *channel) {
2024-02-18 10:42:21 +00:00
c.checkInterval = d
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
// WithChannelSendTimeout specifies the channel send timeout after which
2024-02-18 10:42:21 +00:00
// the message is dropped.
2024-02-18 10:42:21 +00:00
//
2024-02-18 10:42:21 +00:00
// The default is 60 seconds.
2024-02-18 10:42:21 +00:00
func WithChannelSendTimeout(d time.Duration) ChannelOption {
2024-02-18 10:42:21 +00:00
return func(c *channel) {
2024-02-18 10:42:21 +00:00
c.chanSendTimeout = d
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
type channel struct {
pubSub *PubSub
msgCh chan *Message
2024-02-18 10:42:21 +00:00
allCh chan interface{}
ping chan struct{}
chanSize int
2024-02-18 10:42:21 +00:00
chanSendTimeout time.Duration
checkInterval time.Duration
2024-02-18 10:42:21 +00:00
}
func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
2024-02-18 10:42:21 +00:00
c := &channel{
2024-02-18 10:42:21 +00:00
pubSub: pubSub,
chanSize: 100,
2024-02-18 10:42:21 +00:00
chanSendTimeout: time.Minute,
checkInterval: 3 * time.Second,
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
for _, opt := range opts {
2024-02-18 10:42:21 +00:00
opt(c)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if c.checkInterval > 0 {
2024-02-18 10:42:21 +00:00
c.initHealthCheck()
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
return c
2024-02-18 10:42:21 +00:00
}
func (c *channel) initHealthCheck() {
2024-02-18 10:42:21 +00:00
ctx := context.TODO()
2024-02-18 10:42:21 +00:00
c.ping = make(chan struct{}, 1)
go func() {
2024-02-18 10:42:21 +00:00
timer := time.NewTimer(time.Minute)
2024-02-18 10:42:21 +00:00
timer.Stop()
for {
2024-02-18 10:42:21 +00:00
timer.Reset(c.checkInterval)
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case <-c.ping:
2024-02-18 10:42:21 +00:00
if !timer.Stop() {
2024-02-18 10:42:21 +00:00
<-timer.C
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
case <-timer.C:
2024-02-18 10:42:21 +00:00
if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
2024-02-18 10:42:21 +00:00
c.pubSub.mu.Lock()
2024-02-18 10:42:21 +00:00
c.pubSub.reconnect(ctx, pingErr)
2024-02-18 10:42:21 +00:00
c.pubSub.mu.Unlock()
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
case <-c.pubSub.exit:
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}()
2024-02-18 10:42:21 +00:00
}
// initMsgChan must be in sync with initAllChan.
2024-02-18 10:42:21 +00:00
func (c *channel) initMsgChan() {
2024-02-18 10:42:21 +00:00
ctx := context.TODO()
2024-02-18 10:42:21 +00:00
c.msgCh = make(chan *Message, c.chanSize)
go func() {
2024-02-18 10:42:21 +00:00
timer := time.NewTimer(time.Minute)
2024-02-18 10:42:21 +00:00
timer.Stop()
var errCount int
2024-02-18 10:42:21 +00:00
for {
2024-02-18 10:42:21 +00:00
msg, err := c.pubSub.Receive(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
if err == pool.ErrClosed {
2024-02-18 10:42:21 +00:00
close(c.msgCh)
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if errCount > 0 {
2024-02-18 10:42:21 +00:00
time.Sleep(100 * time.Millisecond)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
errCount++
2024-02-18 10:42:21 +00:00
continue
2024-02-18 10:42:21 +00:00
}
errCount = 0
// Any message is as good as a ping.
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case c.ping <- struct{}{}:
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
}
switch msg := msg.(type) {
2024-02-18 10:42:21 +00:00
case *Subscription:
2024-02-18 10:42:21 +00:00
// Ignore.
2024-02-18 10:42:21 +00:00
case *Pong:
2024-02-18 10:42:21 +00:00
// Ignore.
2024-02-18 10:42:21 +00:00
case *Message:
2024-02-18 10:42:21 +00:00
timer.Reset(c.chanSendTimeout)
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case c.msgCh <- msg:
2024-02-18 10:42:21 +00:00
if !timer.Stop() {
2024-02-18 10:42:21 +00:00
<-timer.C
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
case <-timer.C:
2024-02-18 10:42:21 +00:00
internal.Logger.Printf(
2024-02-18 10:42:21 +00:00
ctx, "redis: %s channel is full for %s (message is dropped)",
2024-02-18 10:42:21 +00:00
c, c.chanSendTimeout)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}()
2024-02-18 10:42:21 +00:00
}
// initAllChan must be in sync with initMsgChan.
2024-02-18 10:42:21 +00:00
func (c *channel) initAllChan() {
2024-02-18 10:42:21 +00:00
ctx := context.TODO()
2024-02-18 10:42:21 +00:00
c.allCh = make(chan interface{}, c.chanSize)
go func() {
2024-02-18 10:42:21 +00:00
timer := time.NewTimer(time.Minute)
2024-02-18 10:42:21 +00:00
timer.Stop()
var errCount int
2024-02-18 10:42:21 +00:00
for {
2024-02-18 10:42:21 +00:00
msg, err := c.pubSub.Receive(ctx)
2024-02-18 10:42:21 +00:00
if err != nil {
2024-02-18 10:42:21 +00:00
if err == pool.ErrClosed {
2024-02-18 10:42:21 +00:00
close(c.allCh)
2024-02-18 10:42:21 +00:00
return
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
if errCount > 0 {
2024-02-18 10:42:21 +00:00
time.Sleep(100 * time.Millisecond)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
errCount++
2024-02-18 10:42:21 +00:00
continue
2024-02-18 10:42:21 +00:00
}
errCount = 0
// Any message is as good as a ping.
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case c.ping <- struct{}{}:
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
}
switch msg := msg.(type) {
2024-02-18 10:42:21 +00:00
case *Pong:
2024-02-18 10:42:21 +00:00
// Ignore.
2024-02-18 10:42:21 +00:00
case *Subscription, *Message:
2024-02-18 10:42:21 +00:00
timer.Reset(c.chanSendTimeout)
2024-02-18 10:42:21 +00:00
select {
2024-02-18 10:42:21 +00:00
case c.allCh <- msg:
2024-02-18 10:42:21 +00:00
if !timer.Stop() {
2024-02-18 10:42:21 +00:00
<-timer.C
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
case <-timer.C:
2024-02-18 10:42:21 +00:00
internal.Logger.Printf(
2024-02-18 10:42:21 +00:00
ctx, "redis: %s channel is full for %s (message is dropped)",
2024-02-18 10:42:21 +00:00
c, c.chanSendTimeout)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
default:
2024-02-18 10:42:21 +00:00
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}
2024-02-18 10:42:21 +00:00
}()
2024-02-18 10:42:21 +00:00
}