我在kafka中使用高级客户机(遗留代码)。
但是,我观察到,即使主题上有许多可用的消息,消费者仍然被阻塞,直到zk连接超时时间过去。
为什么会这样?我如何解决这个问题?我很感兴趣-有消息吗?再往前走。
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="5000"
zk-session-timeout="5000" zk-sync-time="2000" />
<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
1条答案
按热度按时间q1qsirdb1#
好吧,当您使用多个主题、配置、以及每个主题时,这是高级使用者实现的一个已知问题
KafkaStream
被阻塞并参与相同的顺序迭代过程。有关更多信息,请参阅使用2个使用者配置时的慢速使用者ThroupOut。
更新
这个
consumer-timeout
就是这个意思ConsumerIterator
从KafkaStream
:在哪里
channel
只是BlockingQueue
. 我想这并不奇怪,为什么我们在检索邮件的过程中被阻止了consumer-timeout
.从另一个,请,找到
max-messages
上的选项<int-kafka:consumer-configuration>
. 它是1
默认情况下。