我有一个用@kafkalistener注解的kafka侦听器方法。它接受message类型的参数和确认。我处理收到的消息并使用acknowledgement.acknowledge()手动提交。我已经在容器上设置了重试模板。重试策略定义为特定于异常的。为此,我创建了自己的retryploicy类,并使用exceptionclassifierretrypolicy进行了扩展。在那个类中,根据接收到的异常,我返回alwaysretrypolicy、neverretrypolicy和simpleretrypolicy。我遇到的问题是,当侦听器方法中的消息在处理过程中发生dataaccessexception时,我希望永远重试,并且我已经相应地配置了重试策略,但是listener方法总是抛出listenerexecutionfailedexception,而不是在堆栈下面抛出遇到的异常,直到listener方法从上面的消息处理方法返回为止。由于此异常是由侦听器引发的,因此我的重试配置无法按预期工作。
示例代码如下:
@KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroType> message, Acknowledgement ack){
SomeAvroType type = message.getPayLoad();
type.processIncomingMessage();
ack.acknowledge();
}
重试策略配置
@component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);
this.setExceptionClassifier( new Classifier<Throwable, RetryPolicy>()
{
@Override
public RetryPolicy classify( Throwable classifiable ){
// Always Retry when instanceOf TransientDataAccessException
if ( classifiable instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy;
}
else if(classifiable instanceOf SomeOtherException){
return simpleRetryPolicy;
}
// Do not retry for other exceptions
return new NeverRetryPolicy();
}
} );
}
}
我使用了容器上提供的大部分自动配置,因此我在retry config类中自动连接concurrentKafkalListenerContainerFactory。
@configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired @Qualifier("kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory factory;){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(1000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset)
}
}
当我在调试模式下运行它,并在processincomingmessage()中抛出transientdataaccessexception时,我希望总是重试,但侦听器方法不会抛出传播的异常,而是抛出listenerexecutionfailedexception,其原因(e.getcause())是transientdataaccessexception。因此,重试策略的计算结果总是NeverRetryPolicy。在侦听器中是否有方法抛出传播的异常,以便我的重试配置正确执行?
1条答案
按热度按时间8e2ybdfx1#
请参见binaryexceptionclassifier及其
traverseCauses
财产。