我遵循baeldung.com的“ApacheKafka与spring简介”教程。我建立了一个 KafkaConsumerConfig
与 kafkaConsumerFactory
方法:
private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
...
return new DefaultKafkaConsumerFactory<>(props);
}
和两个“定制”工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
在 MessageListener
我用 @KafkaListener
使用给定的 groupId
听一个主题:
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
System.out.println("Received Message in group 'bar': " + message);
...
}
这样就有两组消费者,一组具有groupid“foo”,另一组具有groupid“bar”。
现在,如果我改变容器工厂为“foo”消费者从 fooKafkaListenerContainerFactory
至 barKafkaListenerContainerFactory
以这种方式
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
...
}
这似乎是不相容的 groupId
的 KafkaListener
以及 groupId
集装箱工厂,但没有任何变化。所以,我想了解的是 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
房地产确实如此,为什么它看起来不被考虑。
1条答案
按热度按时间rhfm7lfc1#
工厂
groupId
是默认值,仅在没有groupId
(或id
)在@KafkaListener
.在早期版本中,只能在工厂上设置groupid,这意味着如果需要不同的组,则每个侦听器都需要一个单独的工厂,这打破了工厂可以用于多个侦听器的想法。
查看javadocs。。。