关于Kafka和SpringKafka的并发性的小问题。
我有一个Kafka主题theimportanttopic
,许多信息都通过它发送。事实上,这个Kafka主题有三个部分。(称它们为重要主题-0重要主题-1重要主题-2)
众所周知,Kafka不允许同一组中的多个消费者消费同一分区中的消息,即同一组中的两个消费者不能消费重要主题0中的消息。
我的Spring Kafka应用程序代码如下:
@Configuration
class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafka.com:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1); //HERE
return factory;
}
}
@Component
class KafkaListenersExample {
Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);
@KafkaListener(topics = "theimportanttopic", groupId = "uniquegroup")
void listener(String data) {
LOG.info(data);
doSomethingImportantWithTheData(data);
}
}
因此,我很难理解这两个概念之间的区别:
假设这个应用程序已经被停靠,并且云环境已经可以使用。
对于结构1,我可以给出1CPU +1G内存 * 3,对于结构2,我可以给出3CPU +3G内存。
设计编号1:由于此应用程序位于部署在云上的容器(如Kubernetes)中,因此会启动它的三个示例。根据定义,我将拥有三个这样的"应用程序",每个应用程序将使用三个分区中的一个分区。
kubectl get pods
my-app-AaAaAaAaAa-AaAaA
my-app-BbBbBbBbBb-BbBbB
my-app-CcCcCcCcCc-CcCcC
(and假设,my-app-AaAaAaAaAa-AaAaA消耗重要主题-0,my-app-BbBbBbBb-BbBbB消耗重要主题-1,my-app-CcCcCcCc-CcCcC消耗重要主题-2)
设计编号2:另一方面,我可以有一个,并且只有一个这个应用程序my-app in container,将并发设置为3。(与上面的代码相同,只有一行更改)
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
请问这两种款式有什么不同?
哪一个更好,为什么?
这不是一个基于意见的问题。我可以知道性能是什么,成本是什么,设计1号和设计2号之间的利弊吗?
谢谢
1条答案
按热度按时间hfyxw5xn1#
不同之处在于高可用性。
如果您有任何一个pod,消耗所有三个分区,并且它停止,那么您需要在k8中进行额外的配置以获得RestartPolicy。
或者,让ReplicaSet的maxContainers为3,然后Kafka Consumer API可以在其中任何一个容器启动/停止时重新平衡。
您还可以考虑使用KEDA来根据消费者延迟进行自动缩放。