Redis golang客户端定期丢弃错误的PubSub连接(EOF)

e37o9pze  于 2022-10-31  发布在  Redis
关注(0)|答案(3)|浏览(547)

我做了什么:

我正在使用github.com/go-redis/redisgolang Redis库。我的客户端监听一个名为“control”的PubSub通道。每当有消息到达时,我处理它并继续接收下一条消息。我无休止地监听,消息可能经常到来,有时几天都不会到来。

我的期望:

我希望redis通道无休止地保持开放,并在发送消息时接收消息。

我的体验:

通常它可以正常运行几天,但是client.Receive()偶尔会返回EOF错误,在这个错误之后,客户端就不再接收该通道上的消息了。
2019年8月29日14:18:57发布时间:2019年8月29日redis:放弃错误的PubSub连接:文件结束

***免责声明:***我不确定此错误是否导致我停止接收消息,它只是看起来相关。

其他问题:

我想了解为什么会发生这种情况,如果这是正常的,如果重新连接到通道通过client.Subscribe()每当我遇到的行为是一个很好的补救措施,或者我应该解决根本问题,无论它可能是。

代码:

下面是处理我的客户端的完整代码(连接到redis,订阅频道,无休止地接收消息):

func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {
    rootLogger = log.With(zap.String("component", "redis-client"))

    host := env.RedisHost
    port := env.RedisPort
    pass := env.RedisPass
    addr := fmt.Sprintf("%s:%s", host, port)
    tlsCfg := &tls.Config{}
    client = redis.NewClient(&redis.Options{
        Addr:      addr,
        Password:  pass,
        TLSConfig: tlsCfg,
    })

    if _, err := client.Ping().Result(); err != nil {
        return err
    }

    go func() {
        controlSub := client.Subscribe("control")
        defer controlSub.Close()
        for {
            in, err := controlSub.Receive()  //***SOMETIMES RETURNS EOF ERROR***
            if err != nil {
                rootLogger.Error("failed to get feedback", zap.Error(err))
                break
            }
            switch in.(type) {
            case *redis.Message:
                cm := comm.ControlMessageEvent{}
                payload := []byte(in.(*redis.Message).Payload)
                if err := json.Unmarshal(payload, &cm); err != nil {
                    rootLogger.Error("failed to parse control message", zap.Error(err))
                } else if err := handleIncomingEvent(&cm); err != nil {
                    rootLogger.Error("failed to handle control message", zap.Error(err))
                }

            default:
                rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in))
            }
        }
    }()
    return nil
}
niwlg2el

niwlg2el1#

我通过在从pubsub.Channel()(而不是Receive())返回的通道上进行测距来解决断开问题。
下面是新代码:

func listenToControlChannel(client *redis.Client) {
    pubsub := client.Subscribe("control")
    defer pubsub.Close()

    if _, err := pubsub.Receive(); err != nil {
        rootLogger.Error("failed to receive from control PubSub", zap.Error(err))
        return
    }

    controlCh := pubsub.Channel()
    fmt.Println("start listening on control PubSub")

    // Endlessly listen to control channel,
    for msg := range controlCh {
        cm := ControlMessageEvent{}
        payload := []byte(msg.Payload)
        if err := json.Unmarshal(payload, &cm); err != nil {
            fmt.Printf("failed to parse control message: %s\n", err.Error())
        } else if err := handleIncomingEvent(&cm); err != nil {
            fmt.Printf("failed to handle control message: %s\n", err.Error())
        }
    }
}
hfyxw5xn

hfyxw5xn2#

我不知道如果这是正确的方法,但当创建新的Redis客户端时,将ReadTimeout属性设置为**-1**为我修复了这个问题。

redisClient := redis.NewClient(&redis.Options{
    Addr:        addr,
    Password:    redisConf.Password,
    DB:          0, // Default DB
    ReadTimeout: -1,
})

注意:我使用的是go-redis/v9

ibrsph3r

ibrsph3r3#

我的看法是,如果Redis认为客户端处于空闲状态,它可能会断开您的客户端。
科普这种情况的办法似乎是这样的:
1.请使用ReceiveTimeout而不是Receive
1.如果操作超时,则发出Ping并等待答复。
1.冲洗,重复。
这样,您就可以确保连接上存在一些流量-无论是否实际发布了任何数据。
我会从这里开始。

相关问题