我有Kafka·汉德勒的春靴:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
例如,生产者每秒发送一条消息。但是 myService.processResponse
工作10秒。我需要处理好每一条信息然后开始 myService.processResponse
在新线程中。我可以创建我的执行者并将每个响应委托给它。但我认为Kafka还有另一种配置。我找到2个:
1) 添加 concurrency = "5"
至 @KafkaListener
注解-它似乎在工作。但我不确定有多正确,因为我有第二种方法:
2) 我可以创造 ConcurrentKafkaListenerContainerFactory
开始吧 ConsumerFactory
以及 concurrency
我不明白这些方法之间的区别?仅仅补充一下就够了吗 concurrency = "5"
至 @KafkaListener
否则我需要创建 ConcurrentKafkaListenerContainerFactory
?
或者我什么都不懂,还有别的办法吗?
2条答案
按热度按时间b09cbbtk1#
使用执行器使事情变得复杂,涉及到管理承诺的补偿;不建议这样做。
与
@KafkaListener
,框架创建ConcurrentKafkaListenerContainerFactory
为你。concurrency
在注解上只是一种方便;它会覆盖出厂设置。这允许您对多个侦听器使用同一工厂,每个侦听器具有不同的并发性。
您可以使用boot属性设置容器并发性(默认值);该值被注解值覆盖;查看javadocs。。。
2vuwiymt2#
concurrency
选项与并发处理同一使用者接收的消息无关。当您有多个消费者时,每个消费者都处理自己的分区,这是针对消费者组的。将处理传递到一个单独的线程是非常复杂的,我相信springkafka团队决定不“按设计”这样做。你甚至不需要钻研《Spring的Kafka》来理解原因。查看kafkaconsumer的Detecting consumer failures文档:
必须注意确保承诺的抵消不会超过实际头寸。通常,只有在线程处理完记录之后,才必须禁用记录的自动提交和手动提交已处理的偏移量(取决于所需的传递语义)。还要注意,您需要暂停分区,以便在线程处理完之前返回的记录之前,不会从poll接收到新记录。