Kafka消费者:如何在go sarama中从特定偏移量编程消费

jei2mxaa  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(863)

最近,我开始和Kafka一起学习工作。我正在做的项目使用萨拉玛。
阅读我使用的信息 ConsumerGroup .
我需要在一段时间后再读一遍这封信如果 foo 退货 false . 如何做到这一点?

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

            if ok := foo(message); ok {
                session.MarkMessage(message, "")
            } else {
                // ???
            }

    }

    return nil
}
xdnvmnnf

xdnvmnnf1#

您可以通过将以下内容包含在您的使用者组中,将使用者组的偏移重置为旧的偏移 Setup() 回拨:

func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
    sess.ResetOffset(topic, partition, offset, "")

    return nil
}

您也可以通过控制台实现:

kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --topic myTopicName \
    --reset-offsets \
    --to-offfset 100 \
    --execute

相关问题