如何在不同的线程中处理@kafkalistener方法?

wi3ka0sx  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(525)

我有Kafka·汉德勒的春靴:

@KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

例如,生产者每秒发送一条消息。但是 myService.processResponse 工作10秒。我需要处理好每一条信息然后开始 myService.processResponse 在新线程中。我可以创建我的执行者并将每个响应委托给它。但我认为Kafka还有另一种配置。我找到2个:
1) 添加 concurrency = "5"@KafkaListener 注解-它似乎在工作。但我不确定有多正确,因为我有第二种方法:
2) 我可以创造 ConcurrentKafkaListenerContainerFactory 开始吧 ConsumerFactory 以及 concurrency 我不明白这些方法之间的区别?仅仅补充一下就够了吗 concurrency = "5"@KafkaListener 否则我需要创建 ConcurrentKafkaListenerContainerFactory ?
或者我什么都不懂,还有别的办法吗?

b09cbbtk

b09cbbtk1#

使用执行器使事情变得复杂,涉及到管理承诺的补偿;不建议这样做。
@KafkaListener ,框架创建 ConcurrentKafkaListenerContainerFactory 为你。 concurrency 在注解上只是一种方便;它会覆盖出厂设置。
这允许您对多个侦听器使用同一工厂,每个侦听器具有不同的并发性。
您可以使用boot属性设置容器并发性(默认值);该值被注解值覆盖;查看javadocs。。。

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";
2vuwiymt

2vuwiymt2#

concurrency 选项与并发处理同一使用者接收的消息无关。当您有多个消费者时,每个消费者都处理自己的分区,这是针对消费者组的。
将处理传递到一个单独的线程是非常复杂的,我相信springkafka团队决定不“按设计”这样做。你甚至不需要钻研《Spring的Kafka》来理解原因。查看kafkaconsumer的Detecting consumer failures文档:
必须注意确保承诺的抵消不会超过实际头寸。通常,只有在线程处理完记录之后,才必须禁用记录的自动提交和手动提交已处理的偏移量(取决于所需的传递语义)。还要注意,您需要暂停分区,以便在线程处理完之前返回的记录之前,不会从poll接收到新记录。

相关问题