我有一个消费者,消费一条信息,做一些繁重的工作,然后确认。
@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {
try {
//Heavy Job
ack.acknowledge();
} catch (Exception e) {
log("Error in Kafka Consumer);
}
}
现在,如果有异常,它应该转到catch块,并且不应该发生确认,如果没有发生确认,它应该返回到队列并再次进行处理。但这并没有发生。偏移量将更新,并拾取下一条消息。我知道消费者有一个投票大小,它可以一次选择多条消息。但是,即使一条消息没有得到确认,它也应该重新处理它,而不是忽略它并更新偏移量。
这是Kafka消费者配置
`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
1条答案
按热度按时间5kgi1eie1#
这是潜在客户的预期行为
KafkaConsumer
.在封面下,Kafka消费者使用
poll
api,在javadocs中描述为:每次轮询时,使用者都会尝试使用上次使用的偏移量作为起始偏移量,并按顺序获取。最后消耗的偏移量可以通过seek(topicpartition,long)手动设置,也可以自动设置为订阅的分区列表的最后提交偏移量
这意味着,它不检查最后提交的偏移量,而是检查最后消耗的偏移量,然后按顺序获取数据。只有在重新开始作业时,它才会继续从该消费群体的上次提交的偏移量中读取,或者如果您使用基于
auto_offset_reset
配置。为了解决您的问题,我可以在catch块中应用以下解决方案:
而不是仅仅记录“Kafka消费错误”让你的工作关闭。修复代码并重新启动应用程序
使用偏移量编号(导致异常的编号)再次使用
seek
应用程序编程接口。seek方法的详细信息可以在这里找到