何时使用多个ConcurrentKafkaListenerContainerFactory与Spring Kafka?

nuypyhwy  于 2023-03-01  发布在  Apache
关注(0)|答案(1)|浏览(158)

在我们的一个微服务中,我们有三个@KafkaListener方法,监听三个不同的主题,它们都在同一个消费者组中,如下所示:

@KafkaListener(clientIdPrefix = "MicroServiceNameFromWhichItIsConsuming1",
                   topics = "${path1.to.topic.in.yaml}",
                   autoStartup = "${spring.kafka.consumer.auto-startup}",
                   groupId = "${spring.kafka.consumer.group-id}")
            public void onMessage(ConsumerRecord<Integer, String> record) throws Exception {
            log.info("Record Received MicroServiceNameFromWhichItIsConsuming1 " + "Key: " + record.key() + "Offset " + record.offset());
}

    @KafkaListener(clientIdPrefix = "MicroServiceNameFromWhichItIsConsuming2",
                   topics = "${path2.to.topic.in.yaml}",
                   autoStartup = "${spring.kafka.consumer.auto-startup}",
                   groupId = "${spring.kafka.consumer.group-id}")
            public void onMessage(ConsumerRecord<Integer, String> record) throws Exception {
            log.info("Record Received MicroServiceNameFromWhichItIsConsuming2 " + "Key: " + record.key() + "Offset " + record.offset());
}

    @KafkaListener(clientIdPrefix = "MicroServiceNameFromWhichItIsConsuming3",
                   topics = "${path3.to.topic.in.yaml}",
                   autoStartup = "${spring.kafka.consumer.auto-startup}",
                   groupId = "${spring.kafka.consumer.group-id}")
            public void onMessage(ConsumerRecord<Integer, String> record) throws Exception {
            log.info("Record Received MicroServiceNameFromWhichItIsConsuming3 " + "Key: " + record.key() + "Offset " + record.offset());
}

在消费者配置类中,我们以如下方式定义了ConcurrentKafkaListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

        configurer.configure(factory, kafkaConsumerFactory);
        factory.setConsumerFactory(kafkaConsumerFactory);
    
        return factory;
}

根据此article,具有三个引用相同使用者组的@KafkaListner方法可能是一个潜在问题,因为:
如果一个服务有多个订阅互斥主题但共享同一个www.example.com的使用者,则任何一个使用者触发的重新平衡仍将影响组中的其他使用者。group.id then any rebalance triggered by any one consumer would still affect the other consumers in the group.
当使用者A最终完成其轮询并重新加入使用者组时,将触发进一步的重新平衡,并且所有处理将再次停止,因为分区将被撤销和重新分配。
Q0:
重新平衡时,使用org.apache.kafka.clients.consumer.CooperativeStickyAssignor是否会缓解问题?
那么,如果我在@KafkaListener中的groupId中放置不同的消费者群体,这会解决问题吗?
问题一:
为什么我应该或不应该在@KafkaListener中的containerFactory中引用三个不同的ConcurrentKafkaListenerContainerFactory(每个注解都有自己的containerFactory)?ConcurrentKafkaListenerContainerFactoryConfigurer的角色是什么?containerGroup@KafkaListener中的角色是什么?
问题二:
我有95%的把握,这些主题中的每一个都将由三个分区组成。
假设我们有该微服务的一个示例,并且我将@KafkaListener方法分为三个不同的使用者组。在这种情况下,我是否应该为每个containerFactory设置concurrency=3?然后,KafkaMessageListenerContainer的数量等于concurrency?拥有一个KafkaMessageListenerContainer是否是最佳选择(由ConcurrentKafkaListenerContainerFactory创建)?另一方面,如果Kubernetes/OpenShift/whatever决定扩展并再启动一个微服务示例,我将有空闲的KafkaMessageListenerContainer
注:.yaml中消费者部分的当前配置如下所示:

spring:
 kafka:
  admin:
    fail-fast: true
  consumer:
    key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      
    auto-startup: true
    max-poll-records: 2500
    group-id: some-group-name
    properties:
      allow.auto.create.topics: false
kkih6yb8

kkih6yb81#

将不同主题的使用者放在同一组中并不是一种好的做法,因为对一个主题进行重新平衡会导致对其他使用者进行不必要的重新平衡。
一个合作的分配者可能会改进事情,但是把他们放在不同的组中仍然是更好的。
使用敕勒版本的框架,您很少需要多个容器工厂,因为许多公共属性(group.id、client.id等)可以在注解中覆盖。
请参阅containerGroup的javadocs;它与group.id无关。
这只是一种定义一组容器(ContainerGroup bean)的方法,您可以使用该组容器作为一个组(所有容器的子集)停止/启动。
concurrency * maxInstances必须小于或等于主题中的分区数,以避免没有分配分区的空闲容器。

相关问题