我试图用java实现一个简单的生产者-->Kafka-->消费者应用程序。我能够成功地生成和使用消息,但问题发生在我重新启动消费者时,其中一些已经使用的消息再次被消费者从kafka获取(不是所有消息,而是最后使用的一些消息)。
我已经准备好了 autooffset.reset=largest
在我的消费者和我的 autocommit.interval.ms
属性设置为1000毫秒。
“重新传递一些已使用的邮件”是已知的问题,还是我在这里缺少其他设置?
基本上,有没有一种方法可以确保消费者不会接收/使用以前使用过的消息?
1条答案
按热度按时间clj7thdc1#
Kafka使用zookeeper来存储消费者补偿。由于zookeeper操作非常慢,所以不建议在每次使用消息之后提交offset。
可以向使用者添加shutdownhook,该使用者将在退出前手动提交主题偏移量。但是,在某些情况下(比如jvm崩溃或
kill -9
). 为了防止出现这种情况,我建议实现自定义提交逻辑,在处理每条消息(文件或本地数据库)后在本地提交offset,并且每1000ms将offset提交给zookeeper。在consumer启动时,应该查询这两个位置,并且最多使用两个值作为消耗offset。