我正试着读Kafka主题的信息,但我看不懂。进程在一段时间后被终止,没有读取任何消息。
下面是我得到的重新平衡错误:
[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer: (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages
我试着跑 ConsumerOffsetChecker
,这就是我得到的错误。我不知道怎么解决这个问题。有人知道吗?
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group topic_group
Group Topic Pid Offset logSize Lag Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
... 16 more
5条答案
按热度按时间nszi6y051#
可能您的代理处于脱机状态,无法连接到zookeeper,您是否尝试运行中提供的控制台使用者脚本
$KAFKA_ROOT_DIR/bin
用于检查是否能够从特定主题消费的路径。kmb7vmvb2#
我最近也有类似的问题。您可以尝试将consumer configurations rebalance.backoff.ms和zookeeper.session.timeout.ms增加到大约5-10秒。
第一个参数告诉Kafka在重试重新平衡之前要等待更多时间。第二个告诉Kafka,要有耐心,同时试图连接到Zookeeper。
其他配置选项可以在官方文档中找到。
eoigrqb63#
这可能意味着代理在连接到zookeeper时没有正确创建这些节点。尝试消费时,应该存在/consumer路径。
以下是一些调试路径:
你创造了什么主题吗?
如果是:
主题中有多少个分区?
您是否检查了主题元数据是否正确地填充在zookeeper中?
我们能看看你的消费者配置吗?
如果没有:
然后需要使用脚本创建一个主题
$KAFKA_DIR/bin/kafka-create-topic.sh
. 查看脚本内部的用法详细信息。一旦你做了一个主题,你就需要用一个以前没有用过的组id来创建一个消费者,否则你就不能重新开始了。
sgtfey8w4#
kafka.tools.consumeroffetchecker中有一个bug。如果保存消耗的偏移量信息的特定zookeeper节点没有退出,则该工具将退出并抛出execption。
例如,假设您有一个消费者组“mygroup”和一个主题“topictest”。然后在znode:/consumers/mygroup/offsets/topictest/2中维护分区2的偏移量。
如果znode中没有topictest的分区2的条目,那么消费者offsetchecker工具将退出,同时检查分区2的偏移量。基本上,当检查zookeeper上缺少znode/consumers/mygroup/offsets/topictest/n的第一个分区“n”时,它将失败。
mf98qq945#
另一个问题可能是因为jar冲突。如果您在library文件夹中存储了相同的jar和不同的版本。可能会出现这个问题。scala库、zkclient、zookeeper、kafka客户端等jar不应使用不同的版本进行复制。