kafka偏移量即使没有到达ack也会增加

oalqel3c  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(416)

我有一个消费者,消费一条信息,做一些繁重的工作,然后确认。

@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
  public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {

try {
  //Heavy Job
  ack.acknowledge();
} catch (Exception e) {
  log("Error in Kafka Consumer);
    }
  }

现在,如果有异常,它应该转到catch块,并且不应该发生确认,如果没有发生确认,它应该返回到队列并再次进行处理。但这并没有发生。偏移量将更新,并拾取下一条消息。我知道消费者有一个投票大小,它可以一次选择多条消息。但是,即使一条消息没有得到确认,它也应该重新处理它,而不是忽略它并更新偏移量。
这是Kafka消费者配置

`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
5kgi1eie

5kgi1eie1#

这是潜在客户的预期行为 KafkaConsumer .
在封面下,Kafka消费者使用 poll api,在javadocs中描述为:
每次轮询时,使用者都会尝试使用上次使用的偏移量作为起始偏移量,并按顺序获取。最后消耗的偏移量可以通过seek(topicpartition,long)手动设置,也可以自动设置为订阅的分区列表的最后提交偏移量
这意味着,它不检查最后提交的偏移量,而是检查最后消耗的偏移量,然后按顺序获取数据。只有在重新开始作业时,它才会继续从该消费群体的上次提交的偏移量中读取,或者如果您使用基于 auto_offset_reset 配置。
为了解决您的问题,我可以在catch块中应用以下解决方案:
而不是仅仅记录“Kafka消费错误”让你的工作关闭。修复代码并重新启动应用程序
使用偏移量编号(导致异常的编号)再次使用 seek 应用程序编程接口。seek方法的详细信息可以在这里找到

相关问题