spring kafka SeektocurInterrorHandler找出失败的记录

vxbzzdmp  于 2021-05-27  发布在  Kafka
关注(0)|答案(1)|浏览(904)

我已经用 KafkaHandler . 我的消费者应该消费事件,然后为每个事件向其他服务发送rest请求。我只想在rest服务关闭时重试。否则,我可以忽略失败的事件。
我的集装箱工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}

我正在使用 ExceptionClassifierRetryPolicy 用于设置异常和相应的重试策略。
重试后一切都很好。当我得到一个 ConnectException 当我得到一个 IllegalArgumentException .
然而,在 IllegalArgumentException 脚本, SeekToCurrentErrorHandler 查找回未处理的偏移量(因为它查找回未处理的消息,包括失败的消息),最后立即重试失败的消息。消费者不断地来回和重试百万次。
如果我有机会知道哪张唱片失败了 SeekToCurrentErrorHandler ,然后我将创建 SeekToCurrentErrorHandler 检查失败的消息是否可重试(通过使用 thrownException 字段)。如果它是不可重试的,那么我会把它从列表中删除 records 寻找回来。
关于如何实现这个功能有什么想法吗?
注: enable.auto.commit 设置为 false , auto.offset.reset 设置为 earliest .
谢谢您!

ubof19bj

ubof19bj1#

有一个 FailedRecordTracker 从Apache·Kafka的Spring开始 2.2 (尚未发布):
https://docs.spring.io/spring-kafka/docs/2.2.0.m2/reference/html/whats-new-part.html#_listener_container_changes
从版本2.2开始 SeekToCurrentErrorHandler 现在可以恢复(跳过)不断失败的记录。默认情况下,10次失败后,将记录失败的记录(错误)。可以使用自定义恢复程序配置处理程序( BiConsumer )和/或最大故障数。

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);

所以,你需要的只是复制/粘贴一个 FailedRecordTracker 以及 SeekToCurrentErrorHandler 源代码来自 master 在您的项目中,您将拥有您想要的功能:
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/failedrecordtracker.java
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/seektocurrenterrorhandler.java

相关问题