何时使用concurrentkafkalistenercontainerfactory?

zbdgwd5y  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(1896)

我是Kafka的新手,我查阅了文件,但什么都不懂。有人能解释一下什么时候用这个吗 ConcurrentKafkaListenerContainerFactory 上课?我用过 Kafkaconsumer 但是我明白了 ConcurrentKafkaListenerContainerFactory 正在我当前的项目中使用。请解释它的用途。

kuarbcqp

kuarbcqp1#

Kafka消费者不是线程安全的。所有网络i/o都发生在发出调用的应用程序的线程中。用户有责任确保多线程访问正确同步。不同步的访问将导致 ConcurrentModificationException. 如果一个使用者被分配了多个分区从中获取数据,它将尝试同时从所有分区中获取数据,从而有效地赋予这些分区相同的优先级。但是,在某些情况下,使用者可能希望首先集中精力全速从指定分区的某个子集获取数据,并且仅在这些分区几乎没有或没有数据可供使用时才开始获取其他分区。
Spring Kafka ConcurrentKafkaListenerContainerFactory 用于为带有注解的方法创建容器 @KafkaListener 有两个 MessageListenerContainer Spring的Kafka

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

这个 KafkaMessageListenerContainer 从单个线程上的所有主题或分区接收所有消息。这个 ConcurrentMessageListenerContainer 一个或多个代理 KafkaMessageListenerContainer 示例提供多线程消费。
使用concurrentmessagelistenercontainer

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

它具有并发属性。例如,container.setconcurrency(3)创建了三个 KafkaMessageListenerContainer 示例。
如果你有六个 TopicPartition 提供示例,并发度为3;每个容器有两个分区。对于五个topicpartition示例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于topicpartitions的数量,则会向下调整并发性,以便每个容器都获得一个分区。
这是一个清晰的例子,这里有文档

y0u0uwnf

y0u0uwnf2#

kafka consumer api不是线程安全的。ConcurrentKafkaListenerContainerFactoryAPI提供了使用kafka使用者api以及设置其他kafka使用者属性的并发方式。

相关问题