springkafka:将kafkalistenererrorhandler应用于所有kafkalistener

7y4bm7vi  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(877)

我想拦截@kafkalistener的抛出的验证错误(methodargumentnotvalidexception)。为了实现这一点,我创建了defaultkafkavalidationerrorhandler,它实现了kafkalistenererrorhandler:

@Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
       //here is some logic
    }

    @Override
    public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        //such as here
    }
}

并给注册官设置一个验证器:

@Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
    @Autowired
    private LocalValidatorFactoryBean validator;

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(this.validator);
    }

}

当我将这个处理程序直接放到我的侦听器时,它工作得很好

@KafkaListener(
            containerFactory = "factory",
            topics = "topic",
            groupId = "group.id",
            errorHandler = "defaultKafkaValidationErrorHandler")

我的问题是,是否有某种方法可以将我的defaultkafkavalidationerrorhandler应用于我项目中的所有@kafkalistener,而不是像前面提到的那样直接应用。可能通过concurrentkafkalistenercontainerfactory、KafkalistenerEndpointRegistrator或其他方式

zlhcx6iw

zlhcx6iw1#

您可以覆盖 C createListenerContainer(KafkaListenerEndpoint endpoint); 方法 ConcurrentKafkaListenerContainerFactory 设置你的全球 KafkaListenerErrorHandler 一切都是单身 endpoint . 你们的工厂必须按法律注册 KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME bean名称,以避免在每个 @KafkaListener .

相关问题