我有一个标准的SpringKafka设置如下,
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> feedbackStreamListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = ("my-topic"),groupId = ("groupId"),containerFactory = "listenerContainerFactory")
public void myListener(@Payload String message) {
System.out.println("Received Message : " + message);
// do some heavy processing
}
现在我需要在3个场景上定制日志(从我的应用程序生成日志),
当我引导消费者时,比如说kafka集群关闭了或者我提供了错误的引导服务器,我需要自定义日志。
当使用者成功启动时,在轮询之前提供一个自定义日志。
当存在自定义消息转换器时,如果存在反序列化问题,则自定义日志。
1条答案
按热度按时间7cwmlq891#
看到了吗
KafkaEvent
实施:https://docs.spring.io/spring-kafka/docs/current/reference/html/#events.所以,当你抓住
ConsumerFailedToStartEvent
在@EventListener
,您可以使用ConsumerStartedEvent
.甚至对于反序列化问题也没有解决方法,但是您肯定可以使用
GenericErrorHandler
要在使用者发生错误时记录某些内容,请执行以下操作:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-错误处理