如何在golang kafka 10中设置消费者从特定偏移开始

lymnna71  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(1159)

我的需要是让生产者从它崩溃前处理的最后一条消息开始。幸运的是,我只有一个主题,一个分区和一个使用者。
为了做到这一点,我尽力了https://github.com/shopify/sarama 但似乎还没有。我现在正在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。
我无法检索最后一个提交的偏移量,我无法确定如何使sarama消费者从所述偏移量开始。到目前为止我找到的唯一参数是 Config.Producer.Offsets.Initial .
如何检索上次提交的偏移量?
如何使使用者从最后一条已提交偏移量的消息开始? OffsetNewest 将使其从生成的最后一条消息开始,而不是消费者最后处理的消息。
是否可以只使用shopify/sarama而不使用bsm/sarama集群?
提前谢谢
p、 我使用的是Kafka10.0,所以偏移量存储在Kafka中,而不是zookeeper中。
edit1:部分解决方案:获取sarama.offsetoldest之后的所有消息,并跳过所有消息,直到找到未处理的消息。

uyto3xhc

uyto3xhc1#

如果已经为分区保存了偏移量,sarama集群将从该偏移量恢复消耗。这个 Config.Producer.Offsets.Initial 选项仅在不存在保存的偏移量时使用(第一次为使用者组运行)。
您可以通过在文件开头添加以下行来验证这一点 main() 功能:

sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)

然后您将在输出中看到如下内容: cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12 其中的12是sarama从该分区开始的偏移量(sample/0)。

相关问题