java Spring的Kafka:3个应用程序setConcurrency(1)与1个应用程序setConcurrency(3)之间的差异

d8tt03nd  于 2023-02-02  发布在  Java
关注(0)|答案(1)|浏览(140)

关于Kafka和SpringKafka的并发性的小问题。
我有一个Kafka主题theimportanttopic,许多信息都通过它发送。事实上,这个Kafka主题有三个部分。(称它们为重要主题-0重要主题-1重要主题-2)
众所周知,Kafka不允许同一组中的多个消费者消费同一分区中的消息,即同一组中的两个消费者不能消费重要主题0中的消息。
我的Spring Kafka应用程序代码如下:

@Configuration
class KafkaConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafka.com:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1); //HERE
        return factory;
    }
}
@Component
class KafkaListenersExample {

    Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);

    @KafkaListener(topics = "theimportanttopic", groupId = "uniquegroup")
    void listener(String data) {
        LOG.info(data);
        doSomethingImportantWithTheData(data);
    }
}

因此,我很难理解这两个概念之间的区别:
假设这个应用程序已经被停靠,并且云环境已经可以使用。
对于结构1,我可以给出1CPU +1G内存 * 3,对于结构2,我可以给出3CPU +3G内存。
设计编号1:由于此应用程序位于部署在云上的容器(如Kubernetes)中,因此会启动它的三个示例。根据定义,我将拥有三个这样的"应用程序",每个应用程序将使用三个分区中的一个分区。

kubectl get pods
my-app-AaAaAaAaAa-AaAaA
my-app-BbBbBbBbBb-BbBbB
my-app-CcCcCcCcCc-CcCcC

(and假设,my-app-AaAaAaAaAa-AaAaA消耗重要主题-0,my-app-BbBbBbBb-BbBbB消耗重要主题-1,my-app-CcCcCcCc-CcCcC消耗重要主题-2)
设计编号2:另一方面,我可以有一个,并且只有一个这个应用程序my-app in container,将并发设置为3。(与上面的代码相同,只有一行更改)

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        return factory;
    }

请问这两种款式有什么不同?
哪一个更好,为什么?
这不是一个基于意见的问题。我可以知道性能是什么,成本是什么,设计1号和设计2号之间的利弊吗?
谢谢

hfyxw5xn

hfyxw5xn1#

不同之处在于高可用性。
如果您有任何一个pod,消耗所有三个分区,并且它停止,那么您需要在k8中进行额外的配置以获得RestartPolicy。
或者,让ReplicaSet的maxContainers为3,然后Kafka Consumer API可以在其中任何一个容器启动/停止时重新平衡。
您还可以考虑使用KEDA来根据消费者延迟进行自动缩放。

相关问题