kafka消息未传递给消费者

mi7gmzs6  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(249)

我在这里面临一个非常奇怪的问题。我在go中使用sarama集群库启动了一个kafka消费者,以消费来自kafka主题的一些消息。但是正在启动的使用者没有收到消息。
然而,一件非常奇怪的事情正在发生。如果我启动另一个与之并行的消费者,消息会突然传递给这两个消费者。
我想不出一个合乎逻辑的解释。任何提示都将不胜感激。
注意:这个问题是在kafka和zookeeper服务器不正常启动之后开始的。
以下是消费者消费不起作用的消息的密码:

if err := consumer.Start(); err != nil {
    return err
}
updChan, err := consumer.Consume()
if err != nil {
    return err
}
go func() {
    for {
        select {
        case msg, ok := <-updChan:
            if !ok {
                return
            }
            var message liveupdater.KafkaMessage
            err := json.Unmarshal(msg.Msg, &message)
            if err != nil {
                fmt.Println(err)
            }
            err = handleMessaege(message)
            if err != nil {
                logrus.Println("encountered error:" + err.Error())
            }

            consumer.MarkProcessed(msg, string(message.Type))
        }
    }
}()

下面是消费者接收消息的go代码(此代码和以前的代码之间的唯一区别是另一个消费者为同一主题并行运行)。

consumeMessages(config)

if err := consumer.Start(); err != nil {
    return err
}
updChan, err := consumer.Consume()
if err != nil {
    return err
}
go func() {
    for {
        select {
        case msg, ok := <-updChan:
            if !ok {
                return
            }
            var message liveupdater.KafkaMessage
            err := json.Unmarshal(msg.Msg, &message)
            if err != nil {
                fmt.Println(err)
            }
            err = handleMessaege(message)
            if err != nil {
                logrus.Println("encountered error:" + err.Error())
            }

            consumer.MarkProcessed(msg, string(message.Type))
        }
    }
}()

func consumeMessages(config *rakshak_config.Config) {
    kafkaConfig := kafka.Config{Brokers: strings.Split(config.Kafka.Brokers, ",")}
    logrus.Println("brokers %s", config.Kafka.Brokers)
    hermesConsumer, err := hermes.NewConsumer(hermes.Kafka, []string{config.Kafka.Topic}, kafkaConfig)
    if err != nil {
        logrus.Println("could not get consumer through hermes %s", err)
    }
    err = hermesConsumer.Start()
    if err != nil {
        logrus.Println("could not start consumer through hermes %s", err)
    }
    conChan, err := hermesConsumer.Consume()
    if err != nil {
        logrus.Println("not able to start consumer channel %s", err)
    }
    go func() {
        for {
            select {
            case msg, ok := <-conChan:
                if !ok {
                    logrus.Println("could not consume message")
                }

                logrus.Println("kafka msg string: %s", string(msg.Msg[:]))

                hermesConsumer.MarkProcessed(msg, "")
            }
        }
    }()

提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题