我想拦截@kafkalistener的抛出的验证错误(methodargumentnotvalidexception)。为了实现这一点,我创建了defaultkafkavalidationerrorhandler,它实现了kafkalistenererrorhandler:
@Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//here is some logic
}
@Override
public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
//such as here
}
}
并给注册官设置一个验证器:
@Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
当我将这个处理程序直接放到我的侦听器时,它工作得很好
@KafkaListener(
containerFactory = "factory",
topics = "topic",
groupId = "group.id",
errorHandler = "defaultKafkaValidationErrorHandler")
我的问题是,是否有某种方法可以将我的defaultkafkavalidationerrorhandler应用于我项目中的所有@kafkalistener,而不是像前面提到的那样直接应用。可能通过concurrentkafkalistenercontainerfactory、KafkalistenerEndpointRegistrator或其他方式
1条答案
按热度按时间zlhcx6iw1#
您可以覆盖
C createListenerContainer(KafkaListenerEndpoint endpoint);
方法ConcurrentKafkaListenerContainerFactory
设置你的全球KafkaListenerErrorHandler
一切都是单身endpoint
. 你们的工厂必须按法律注册KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME
bean名称,以避免在每个@KafkaListener
.