我已经实现了一个简单的kafka消费者,它实现了acknowledgingmessagelistener。存在确认。确认();在onmessage方法的and处,每当一切正常或只有可恢复的异常时都会调用它。在这种情况下一切正常。
但是事情可能会出错,可能会抛出一个错误(我没有发现),jvm可能会在acknowledgement.acknowledge()之前在onmessage方法中崩溃等;被叫来了。
因此,应用程序崩溃,但重新启动后,它没有收到来自受影响的kafka主题的任何消息。即使Kafka重启也无济于事,应用服务器重启也无济于事等等,但其他主题做得很好,Kafka也不倒。
什么是正确的Kafka消费者(或其他)配置使应用程序在onmessage崩溃后重新工作?我想再次接收未确认的消息,然后接收所有尚未阅读的其他消息。手动确认很重要,我不想使用自动确认模式。
1条答案
按热度按时间jk9hmnmh1#
这对我很有帮助:
服务器属性
最大轮询间隔ms=30000
session.timeout.ms=40000