我正在尝试使用kafka中的消费批处理,我发现文档中说retry不受支持,如下所示。
使用批处理模式时不支持在绑定器中重试,因此maxtempts将被重写为1。您可以配置seektocurrentbatcherrorhandler(使用listenercontainercustomizer)来实现类似的功能,以便在活页夹中重试。您还可以使用手动ackmode并调用ackowledgement.nack(index,sleep)来提交部分批处理的偏移量,并重新传递剩余的记录。有关这些技术的更多信息,请参阅springforapachekafka文档
如果我使用acknowledge.nack(index,sleep),当错误发生时它将无限重试。是否有任何方法可以在maxattempts时间内使用acknowledge.nack重试?
代码就像
@StreamListener(Sink.INPUT)
public void consume(@Payload List<PayLoad> payloads, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
try {
consume(payloads);
acknowledgment.acknowledge();
} catch (Exception e) {
acknowledgment.nack(0, 50);
}
}
1条答案
按热度按时间j2datikz1#
没有;你必须自己记录重试次数。
但是,从版本2.5开始,您现在可以使用
RecoveringBatchErrorHandler
在这里,抛出一个特定的异常来告诉处理程序哪个记录失败了,处理程序在该异常之前提交记录的偏移量,并对失败的记录应用重试逻辑。看到了吗https://docs.spring.io/spring-kafka/reference/html/#recovering-批次eh