我做了什么:
我正在使用github.com/go-redis/redis
的golang
Redis库。我的客户端监听一个名为“control”的PubSub通道。每当有消息到达时,我处理它并继续接收下一条消息。我无休止地监听,消息可能经常到来,有时几天都不会到来。
我的期望:
我希望redis通道无休止地保持开放,并在发送消息时接收消息。
我的体验:
通常它可以正常运行几天,但是client.Receive()
偶尔会返回EOF
错误,在这个错误之后,客户端就不再接收该通道上的消息了。
2019年8月29日14:18:57发布时间:2019年8月29日redis:放弃错误的PubSub连接:文件结束
***免责声明:***我不确定此错误是否导致我停止接收消息,它只是看起来相关。
其他问题:
我想了解为什么会发生这种情况,如果这是正常的,如果重新连接到通道通过client.Subscribe()
每当我遇到的行为是一个很好的补救措施,或者我应该解决根本问题,无论它可能是。
代码:
下面是处理我的客户端的完整代码(连接到redis,订阅频道,无休止地接收消息):
func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {
rootLogger = log.With(zap.String("component", "redis-client"))
host := env.RedisHost
port := env.RedisPort
pass := env.RedisPass
addr := fmt.Sprintf("%s:%s", host, port)
tlsCfg := &tls.Config{}
client = redis.NewClient(&redis.Options{
Addr: addr,
Password: pass,
TLSConfig: tlsCfg,
})
if _, err := client.Ping().Result(); err != nil {
return err
}
go func() {
controlSub := client.Subscribe("control")
defer controlSub.Close()
for {
in, err := controlSub.Receive() //***SOMETIMES RETURNS EOF ERROR***
if err != nil {
rootLogger.Error("failed to get feedback", zap.Error(err))
break
}
switch in.(type) {
case *redis.Message:
cm := comm.ControlMessageEvent{}
payload := []byte(in.(*redis.Message).Payload)
if err := json.Unmarshal(payload, &cm); err != nil {
rootLogger.Error("failed to parse control message", zap.Error(err))
} else if err := handleIncomingEvent(&cm); err != nil {
rootLogger.Error("failed to handle control message", zap.Error(err))
}
default:
rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in))
}
}
}()
return nil
}
3条答案
按热度按时间niwlg2el1#
我通过在从
pubsub.Channel()
(而不是Receive()
)返回的通道上进行测距来解决断开问题。下面是新代码:
hfyxw5xn2#
我不知道如果这是正确的方法,但当创建新的Redis客户端时,将ReadTimeout属性设置为**-1**为我修复了这个问题。
注意:我使用的是go-redis/v9
ibrsph3r3#
我的看法是,如果Redis认为客户端处于空闲状态,它可能会断开您的客户端。
科普这种情况的办法似乎是这样的:
1.请使用
ReceiveTimeout
而不是Receive
。1.如果操作超时,则发出
Ping
并等待答复。1.冲洗,重复。
这样,您就可以确保连接上存在一些流量-无论是否实际发布了任何数据。
我会从这里开始。