在我们的一个微服务中,我们有三个@KafkaListener
方法,监听三个不同的主题,它们都在同一个消费者组中,如下所示:
@KafkaListener(clientIdPrefix = "MicroServiceNameFromWhichItIsConsuming1",
topics = "${path1.to.topic.in.yaml}",
autoStartup = "${spring.kafka.consumer.auto-startup}",
groupId = "${spring.kafka.consumer.group-id}")
public void onMessage(ConsumerRecord<Integer, String> record) throws Exception {
log.info("Record Received MicroServiceNameFromWhichItIsConsuming1 " + "Key: " + record.key() + "Offset " + record.offset());
}
@KafkaListener(clientIdPrefix = "MicroServiceNameFromWhichItIsConsuming2",
topics = "${path2.to.topic.in.yaml}",
autoStartup = "${spring.kafka.consumer.auto-startup}",
groupId = "${spring.kafka.consumer.group-id}")
public void onMessage(ConsumerRecord<Integer, String> record) throws Exception {
log.info("Record Received MicroServiceNameFromWhichItIsConsuming2 " + "Key: " + record.key() + "Offset " + record.offset());
}
@KafkaListener(clientIdPrefix = "MicroServiceNameFromWhichItIsConsuming3",
topics = "${path3.to.topic.in.yaml}",
autoStartup = "${spring.kafka.consumer.auto-startup}",
groupId = "${spring.kafka.consumer.group-id}")
public void onMessage(ConsumerRecord<Integer, String> record) throws Exception {
log.info("Record Received MicroServiceNameFromWhichItIsConsuming3 " + "Key: " + record.key() + "Offset " + record.offset());
}
在消费者配置类中,我们以如下方式定义了ConcurrentKafkaListenerContainerFactory
:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setConsumerFactory(kafkaConsumerFactory);
return factory;
}
根据此article,具有三个引用相同使用者组的@KafkaListner
方法可能是一个潜在问题,因为:
如果一个服务有多个订阅互斥主题但共享同一个www.example.com的使用者,则任何一个使用者触发的重新平衡仍将影响组中的其他使用者。group.id then any rebalance triggered by any one consumer would still affect the other consumers in the group.
当使用者A最终完成其轮询并重新加入使用者组时,将触发进一步的重新平衡,并且所有处理将再次停止,因为分区将被撤销和重新分配。
Q0:
重新平衡时,使用org.apache.kafka.clients.consumer.CooperativeStickyAssignor
是否会缓解问题?
那么,如果我在@KafkaListener
中的groupId
中放置不同的消费者群体,这会解决问题吗?
问题一:
为什么我应该或不应该在@KafkaListener
中的containerFactory
中引用三个不同的ConcurrentKafkaListenerContainerFactory
(每个注解都有自己的containerFactory
)?ConcurrentKafkaListenerContainerFactoryConfigurer
的角色是什么?containerGroup
在@KafkaListener
中的角色是什么?
问题二:
我有95%的把握,这些主题中的每一个都将由三个分区组成。
假设我们有该微服务的一个示例,并且我将@KafkaListener
方法分为三个不同的使用者组。在这种情况下,我是否应该为每个containerFactory
设置concurrency=3
?然后,KafkaMessageListenerContainer
的数量等于concurrency
?拥有一个KafkaMessageListenerContainer
是否是最佳选择(由ConcurrentKafkaListenerContainerFactory
创建)?另一方面,如果Kubernetes/OpenShift/whatever决定扩展并再启动一个微服务示例,我将有空闲的KafkaMessageListenerContainer
?
注:.yaml
中消费者部分的当前配置如下所示:
spring:
kafka:
admin:
fail-fast: true
consumer:
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-startup: true
max-poll-records: 2500
group-id: some-group-name
properties:
allow.auto.create.topics: false
1条答案
按热度按时间kkih6yb81#
将不同主题的使用者放在同一组中并不是一种好的做法,因为对一个主题进行重新平衡会导致对其他使用者进行不必要的重新平衡。
一个合作的分配者可能会改进事情,但是把他们放在不同的组中仍然是更好的。
使用敕勒版本的框架,您很少需要多个容器工厂,因为许多公共属性(group.id、client.id等)可以在注解中覆盖。
请参阅
containerGroup
的javadocs;它与group.id
无关。这只是一种定义一组容器(
ContainerGroup
bean)的方法,您可以使用该组容器作为一个组(所有容器的子集)停止/启动。concurrency * maxInstances
必须小于或等于主题中的分区数,以避免没有分配分区的空闲容器。