@kafkalistener vs consumerfactory groupid

nnt7mjpx  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(462)

我遵循baeldung.com的“ApacheKafka与spring简介”教程。我建立了一个 KafkaConsumerConfigkafkaConsumerFactory 方法:

private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}

和两个“定制”工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("foo");
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("bar");
}

MessageListener 我用 @KafkaListener 使用给定的 groupId 听一个主题:

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group 'foo': " + message);
    ...
}

@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
    System.out.println("Received Message in group 'bar': " + message);
    ...
}

这样就有两组消费者,一组具有groupid“foo”,另一组具有groupid“bar”。
现在,如果我改变容器工厂为“foo”消费者从 fooKafkaListenerContainerFactorybarKafkaListenerContainerFactory 以这种方式

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    ...
}

这似乎是不相容的 groupIdKafkaListener 以及 groupId 集装箱工厂,但没有任何变化。所以,我想了解的是 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 房地产确实如此,为什么它看起来不被考虑。

rhfm7lfc

rhfm7lfc1#

工厂 groupId 是默认值,仅在没有 groupId (或 id )在 @KafkaListener .
在早期版本中,只能在工厂上设置groupid,这意味着如果需要不同的组,则每个侦听器都需要一个单独的工厂,这打破了工厂可以用于多个侦听器的想法。
查看javadocs。。。

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";

/**
 * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
 * provided) as the {@code group.id} property for the consumer. Set to false, to use
 * the {@code group.id} from the consumer factory.
 * @return false to disable.
 * @since 1.3
 */
boolean idIsGroup() default true;

相关问题