为什么kafkamessagelistenercontainer从不放弃executor的线程?

pxiryf3j  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(509)

我有一个程序,有多个访问共享资源的流。为了避免共享资源的竞争条件,我决定用一个共享的单线程执行器序列化流。我更喜欢用锁来困扰程序的这种方法——我的程序不会承受很重的负载,也不需要时间限制。
其中一个流从kafka读取消息,下面是该流的代码:

@Bean
public KafkaMessageListenerContainer kafkaFlow(DefaultConsumerFactory consumerFactory, ExecutorService myExecutor) {
    ContainerProperties containerProperties = new ContainerProperties("myTopic");
    ... //more properties
    containerProperties.setConsumerTaskExecutor(new ConcurrentTaskExecutor(myExecutor));
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}

@Bean
public Executor myExecutor() {
    return Executors.newSingleThreadScheduledExecutor();
}

问题是,虽然Kafka流运行良好,但其他流似乎从未获得运行的机会。为什么?我在其他jms容器中使用过这种方法,它们工作得很好。。。
提前谢谢。

bzzcjhmw

bzzcjhmw1#

Kafka要求消费者定期接受民意调查。否则将撤销主题/分区分配;容器为每个使用者使用一个专用线程。
你需要 poll() 如果您希望为其他目的共享线程,那么使用者可以自己而不是使用侦听器容器,但是您仍然需要确保 poll()max.poll.interval.ms 为了避免Kafka认为你的消费者已经死了。

相关问题