用@kafkalistener注解的java方法没有传播遇到的异常由于此原因,无法调用我的重试配置

oymdgrw7  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(489)

我有一个用@kafkalistener注解的kafka侦听器方法。它接受message类型的参数和确认。我处理收到的消息并使用acknowledgement.acknowledge()手动提交。我已经在容器上设置了重试模板。重试策略定义为特定于异常的。为此,我创建了自己的retryploicy类,并使用exceptionclassifierretrypolicy进行了扩展。在那个类中,根据接收到的异常,我返回alwaysretrypolicy、neverretrypolicy和simpleretrypolicy。我遇到的问题是,当侦听器方法中的消息在处理过程中发生dataaccessexception时,我希望永远重试,并且我已经相应地配置了重试策略,但是listener方法总是抛出listenerexecutionfailedexception,而不是在堆栈下面抛出遇到的异常,直到listener方法从上面的消息处理方法返回为止。由于此异常是由侦听器引发的,因此我的重试配置无法按预期工作。
示例代码如下:

@KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
    public void listenToKafkaTopic(@Payload Message<SomeAvroType> message, Acknowledgement ack){
      SomeAvroType type = message.getPayLoad();
      type.processIncomingMessage();
      ack.acknowledge();
   }

重试策略配置

@component 
    public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
     {
      @PostConstruct
      public void init(){
        final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(3);

        this.setExceptionClassifier( new Classifier<Throwable, RetryPolicy>()
          {
            @Override
            public RetryPolicy classify( Throwable classifiable ){

            // Always Retry when instanceOf TransientDataAccessException
            if ( classifiable instanceof TransientDataAccessException)
            {
                return new AlwaysRetryPolicy;
            }
            else if(classifiable instanceOf SomeOtherException){

              return simpleRetryPolicy; 
           }

            // Do not retry for other exceptions
            return new NeverRetryPolicy();
          }
       } );
     }
   }

我使用了容器上提供的大部分自动配置,因此我在retry config类中自动连接concurrentKafkalListenerContainerFactory。

@configuration
    public class RetryConfig{

     @Bean
     public RetryTemplate retryTemplate(@Autowired @Qualifier("kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory factory;){

       RetryTemplate retryTemplate = new RetryTemplate();
       retryTemplate.setRetryPolicy(new MyRetryPolicy());
       FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
       fixedBackOffPolicy.setBackOffPeriod(1000l);
       retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

       factory.setRetryTemplate(retryTemplate);
       factory.setAckOnError(false);
       factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset)

      }
    }

当我在调试模式下运行它,并在processincomingmessage()中抛出transientdataaccessexception时,我希望总是重试,但侦听器方法不会抛出传播的异常,而是抛出listenerexecutionfailedexception,其原因(e.getcause())是transientdataaccessexception。因此,重试策略的计算结果总是NeverRetryPolicy。在侦听器中是否有方法抛出传播的异常,以便我的重试配置正确执行?

8e2ybdfx

8e2ybdfx1#

请参见binaryexceptionclassifier及其 traverseCauses 财产。

/**
 * Create a binary exception classifier.
 * @param defaultValue the default value to use
 * @param typeMap the map of types to classify
 * @param traverseCauses if true, throwable's causes will be inspected to find
 * non-default class
 */
public BinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue,
        boolean traverseCauses) {
    super(typeMap, defaultValue);
    this.traverseCauses = traverseCauses;
}

相关问题