我是Kafka的新手,我查阅了文件,但什么都不懂。有人能解释一下什么时候用这个吗 ConcurrentKafkaListenerContainerFactory
上课?我用过 Kafkaconsumer
但是我明白了 ConcurrentKafkaListenerContainerFactory
正在我当前的项目中使用。请解释它的用途。
我是Kafka的新手,我查阅了文件,但什么都不懂。有人能解释一下什么时候用这个吗 ConcurrentKafkaListenerContainerFactory
上课?我用过 Kafkaconsumer
但是我明白了 ConcurrentKafkaListenerContainerFactory
正在我当前的项目中使用。请解释它的用途。
2条答案
按热度按时间kuarbcqp1#
Kafka消费者不是线程安全的。所有网络i/o都发生在发出调用的应用程序的线程中。用户有责任确保多线程访问正确同步。不同步的访问将导致
ConcurrentModificationException.
如果一个使用者被分配了多个分区从中获取数据,它将尝试同时从所有分区中获取数据,从而有效地赋予这些分区相同的优先级。但是,在某些情况下,使用者可能希望首先集中精力全速从指定分区的某个子集获取数据,并且仅在这些分区几乎没有或没有数据可供使用时才开始获取其他分区。Spring Kafka
ConcurrentKafkaListenerContainerFactory
用于为带有注解的方法创建容器@KafkaListener
有两个MessageListenerContainer
Spring的Kafka这个
KafkaMessageListenerContainer
从单个线程上的所有主题或分区接收所有消息。这个ConcurrentMessageListenerContainer
一个或多个代理KafkaMessageListenerContainer
示例提供多线程消费。使用concurrentmessagelistenercontainer
它具有并发属性。例如,container.setconcurrency(3)创建了三个
KafkaMessageListenerContainer
示例。如果你有六个
TopicPartition
提供示例,并发度为3;每个容器有两个分区。对于五个topicpartition示例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于topicpartitions的数量,则会向下调整并发性,以便每个容器都获得一个分区。这是一个清晰的例子,这里有文档
y0u0uwnf2#
kafka consumer api不是线程安全的。ConcurrentKafkaListenerContainerFactoryAPI提供了使用kafka使用者api以及设置其他kafka使用者属性的并发方式。