为什么kafka消费者会被阻止直到超时,即使有消息?

cngwdvgl  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(407)

我在kafka中使用高级客户机(遗留代码)。
但是,我观察到,即使主题上有许多可用的消息,消费者仍然被阻塞,直到zk连接超时时间过去。
为什么会这样?我如何解决这个问题?我很感兴趣-有消息吗?再往前走。

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="5000"
    zk-session-timeout="5000" zk-sync-time="2000" />

<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
q1qsirdb

q1qsirdb1#

好吧,当您使用多个主题、配置、以及每个主题时,这是高级使用者实现的一个已知问题 KafkaStream 被阻塞并参与相同的顺序迭代过程。
有关更多信息,请参阅使用2个使用者配置时的慢速使用者ThroupOut。
更新
这个 consumer-timeout 就是这个意思 ConsumerIteratorKafkaStream :

currentDataChunk = (FetchedDataChunk)this.channel().poll((long)this.consumerTimeoutMs, TimeUnit.MILLISECONDS);

在哪里 channel 只是 BlockingQueue . 我想这并不奇怪,为什么我们在检索邮件的过程中被阻止了 consumer-timeout .
从另一个,请,找到 max-messages 上的选项 <int-kafka:consumer-configuration> . 它是 1 默认情况下。

相关问题