我使用spring集成kafka扩展来读取和处理java应用程序中kafka的消息。据我所知,它使用了高级使用者api,不允许在zookeeper中完全管理偏移量。
在我的例子中,我们有auto.commit.enable=false,以便在处理消息后将偏移提交给zookeeper。如果处理失败,那么将不会提交offset,我们应该尝试在配置的时间内从zookeeper的offset开始再次处理相同的消息。但它不起作用,因为我假设apachekafka客户机在内存中保留偏移量。
我发现kafka.consumer.consumeriterator处理偏移量,如果其中的consumerdoffset大于zookeeper中的,那么它将使用consumered one读取消息。
所以,我想知道有没有办法重置Kafka客户端中的偏移量,从zookeeper中的偏移量开始读取?
1条答案
按热度按时间x9ybnkn61#
你不介意和我分享一下你的想法吗
<int-kafka>
配置并指出哪里有问题?也许这样就足够了
MessageLeftOverTracker.clearMessagesLeftOver()
?我对Kafka不太了解,但我知道Spring的融合是怎么做的。