我正在尝试从kafka 0.8.1群集检索数据。我带来了一个 ZookeeperConsumerConnector
然后试着打电话 createMessageStreams
在上面。然而,无论我做什么,似乎 createMessageStreams
即使这是我对Kafka所做的唯一一件事,也只是挂掉再也回不来了。
阅读邮件列表这似乎有时会发生一些原因,但据我所知,我没有做任何这些事情。
此外,我要指出的是,我实际上是在clojure中使用clj kafka执行此操作的,但我怀疑clj kafka不是问题所在,因为即使我运行以下代码,我也会遇到问题:
(.createMessageStreams
(clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181"
"group.id" "my.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
{"mytopic" (int 1)})
以及 clj-kafka.consumer.zk/consumer
只是使用 Consumer.createJavaConsumerConnector
创建 ZookeeperConsumerConnector
不做任何花哨的事。
另外,“mytopic”中肯定有消息,因为我可以从命令行运行以下命令并返回我已经发送到主题的所有内容:
% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning
所以也不是说这个主题是空的。
在这一点上感到困惑。思想?
埃塔:我想我说的“挂”的真正意思是,它似乎在旋转一根线,然后停留在它从来没有做任何事情。如果我从repl运行这个代码,我可以通过点击control-c来摆脱它,然后我得到这个错误:
IllegalMonitorStateException java.util.concurrent.locks.ReentrantLock$Sync.tryRelease (ReentrantLock.java:155)
1条答案
按热度按时间iugsix8n1#
我在中断repl时遇到了同样的问题,但也有同样的例外。它挂起的原因是consumer.zk命名空间中的lazy iterate函数。从中读取消息的队列是linkedblockingqueue,并且在lazy iterate函数调用中调用.hasnext。这将在队列上创建一个读锁,并将阻塞和等待,直到有可用的东西从队列中取出。这意味着lazy iterate函数实际上永远不会返回。lazy iterate由“messages”函数调用,如果您不执行类似的操作
然后消息函数将永远不会返回并无限期挂起。我认为这是cljKafka的一个缺陷(或设计缺陷)。为了说明这确实是正在发生的事情,请尝试在使用者配置中设置“consumer.timeout.ms”“0”。它将抛出timeoutexpection并将控制返回给repl。
这进一步导致“with resource”宏出现问题。宏接受到使用者、关闭函数和主体的绑定;它调用身体然后关闭fn。如果在主体内部调用“messages”,主体将永远不会返回,因此将永远不会调用shutdown函数。如果调用了shutdown,messages函数将返回,因为shutdown会在队列上放置一条消息,通知使用者清理其资源和线程以准备gc。此宏将应用程序置于这样一种状态:退出主循环的唯一方法是终止应用程序(或调用它的线程)本身。在为生产环境做好准备之前,该库当然还有一段路要走。