spring集成kafka与偏移管理

jmp7cifd  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(272)

我使用spring集成kafka扩展来读取和处理java应用程序中kafka的消息。据我所知,它使用了高级使用者api,不允许在zookeeper中完全管理偏移量。
在我的例子中,我们有auto.commit.enable=false,以便在处理消息后将偏移提交给zookeeper。如果处理失败,那么将不会提交offset,我们应该尝试在配置的时间内从zookeeper的offset开始再次处理相同的消息。但它不起作用,因为我假设apachekafka客户机在内存中保留偏移量。
我发现kafka.consumer.consumeriterator处理偏移量,如果其中的consumerdoffset大于zookeeper中的,那么它将使用consumered one读取消息。
所以,我想知道有没有办法重置Kafka客户端中的偏移量,从zookeeper中的偏移量开始读取?

x9ybnkn6

x9ybnkn61#

你不介意和我分享一下你的想法吗 <int-kafka> 配置并指出哪里有问题?
也许这样就足够了 MessageLeftOverTracker.clearMessagesLeftOver() ?
我对Kafka不太了解,但我知道Spring的融合是怎么做的。

相关问题