简单的Kafka消费者信息传递复制

afdcj2ne  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(247)

我试图用java实现一个简单的生产者-->Kafka-->消费者应用程序。我能够成功地生成和使用消息,但问题发生在我重新启动消费者时,其中一些已经使用的消息再次被消费者从kafka获取(不是所有消息,而是最后使用的一些消息)。
我已经准备好了 autooffset.reset=largest 在我的消费者和我的 autocommit.interval.ms 属性设置为1000毫秒。
“重新传递一些已使用的邮件”是已知的问题,还是我在这里缺少其他设置?
基本上,有没有一种方法可以确保消费者不会接收/使用以前使用过的消息?

clj7thdc

clj7thdc1#

Kafka使用zookeeper来存储消费者补偿。由于zookeeper操作非常慢,所以不建议在每次使用消息之后提交offset。
可以向使用者添加shutdownhook,该使用者将在退出前手动提交主题偏移量。但是,在某些情况下(比如jvm崩溃或 kill -9 ). 为了防止出现这种情况,我建议实现自定义提交逻辑,在处理每条消息(文件或本地数据库)后在本地提交offset,并且每1000ms将offset提交给zookeeper。在consumer启动时,应该查询这两个位置,并且最多使用两个值作为消耗offset。

相关问题