我们正在使用Kafka偏移量的自定义存储。分区是自动分配的,我们使用ConsumerBalanceListener来保存偏移量、清除线程等等。
在“onpartitionsassigned”方法中,我们从自定义存储中查找已分配分区的提交偏移量。如果没有找到提交的偏移量,我们可以从kafka那里得到适当的偏移量(最早/最晚)。
根据用于ConsumerBalanceListener的javadocs:“可以保证,在任何进程调用'onpartitionsassigned'之前,所有的使用者进程都将调用'onpartitionsrevoked'
然而,我们现在已经看到过好几次了,在重新平衡时,“onpartitionsrevoked”并没有被称为“onpartitionsrevoked”onpartitionsassigned'被直接调用,这会使我们的应用程序处于错误状态。
我们简单地看了一下代码,似乎代码是按照javadocs所承诺的那样做的。所以,问题是,有没有边缘情况,有没有可能触发这种情况的流动?
在这两种情况下,我们观察到的一点是,在尝试从kafka获取偏移量时(因为偏移量不在自定义存储中),由于过时的领导层信息,fetcher至少重试了一次:
日志:“由于过时的领导信息,尝试获取分区{}的偏移量失败,正在重试。”
请告诉我们。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!