我已经用 KafkaHandler
. 我的消费者应该消费事件,然后为每个事件向其他服务发送rest请求。我只想在rest服务关闭时重试。否则,我可以忽略失败的事件。
我的集装箱工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
我正在使用 ExceptionClassifierRetryPolicy
用于设置异常和相应的重试策略。
重试后一切都很好。当我得到一个 ConnectException
当我得到一个 IllegalArgumentException
.
然而,在 IllegalArgumentException
脚本, SeekToCurrentErrorHandler
查找回未处理的偏移量(因为它查找回未处理的消息,包括失败的消息),最后立即重试失败的消息。消费者不断地来回和重试百万次。
如果我有机会知道哪张唱片失败了 SeekToCurrentErrorHandler
,然后我将创建 SeekToCurrentErrorHandler
检查失败的消息是否可重试(通过使用 thrownException
字段)。如果它是不可重试的,那么我会把它从列表中删除 records
寻找回来。
关于如何实现这个功能有什么想法吗?
注: enable.auto.commit
设置为 false
, auto.offset.reset
设置为 earliest
.
谢谢您!
1条答案
按热度按时间ubof19bj1#
有一个
FailedRecordTracker
从Apache·Kafka的Spring开始2.2
(尚未发布):https://docs.spring.io/spring-kafka/docs/2.2.0.m2/reference/html/whats-new-part.html#_listener_container_changes
从版本2.2开始
SeekToCurrentErrorHandler
现在可以恢复(跳过)不断失败的记录。默认情况下,10次失败后,将记录失败的记录(错误)。可以使用自定义恢复程序配置处理程序(BiConsumer
)和/或最大故障数。所以,你需要的只是复制/粘贴一个
FailedRecordTracker
以及SeekToCurrentErrorHandler
源代码来自master
在您的项目中,您将拥有您想要的功能:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/failedrecordtracker.java
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/seektocurrenterrorhandler.java