我知道要使一个方法成为kafka消息侦听器的目标,我必须用@kafkalistener注解来标记这个方法。此注解允许通过containerfactory元素指定kafkalistenercontainerfactory。
下面是baeldungspringkafka教程的一些片段。
kafkaconsumerconfig.java文件
private ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaConsumerFactory<>(props);
}
private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
messagelistener.java文件
@KafkaListener(
topics = "${message.topic.name}",
groupId = "foo",
containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
我不明白的是为什么我们需要一个监听器容器工厂。什么是侦听器容器?当一个方法被这样注解时会发生什么?
1条答案
按热度按时间dy1byipe1#
侦听器“容器”是跨越多种技术(jms、rabbitmq、kafka、aws等)的spring概念。
监听器被定义为pojobean方法,是容器的属性(容器“包含”监听器)。
容器负责与代理进行交互以接收消息,并根据侦听器类型使用每条消息或一批消息调用侦听器方法。
这意味着您的应用程序代码不必处理与代理交互的机制,您可以只关注业务逻辑。
框架发现任何
@KafkaListener
方法并使用工厂为每个方法创建一个容器。