@KafkaListener(topics = "${topics.input}")
public void listener(JsonObj obj) {
//...
}
如果输入了错误的json,就会抛出异常。我需要处理这个异常。主要任务:要保存引发异常的消息的键,我想我需要创建一个自定义处理程序实现并替换它
ErrorHandlingDeserializer()
在我的配置中:
@Bean
public ConsumerFactory<String, JsonObj> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new ErrorHandlingDeserializer(new JsonDeserializer<>(JsonObj.class)));
}
如果是这样,我不知道如何实现这个处理程序。它应该实现什么接口?或者有别的解决办法?问题:如何捕获和处理此异常?
暂无答案!
目前还没有任何答案,快来回答吧!