我有一个kafka应用程序,我使用kafka-console-consumer.sh从中消费消息,如下所示:
$./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic
它给出了我通过Kafka消费者写给Kafka经纪人的所有信息,没有任何遗漏。
最近,我在另一个环境中部署了应用程序,因为zookeeperhost不可访问(由于某些原因)。因此,我使用kafka-simple-consumer-shell.sh,如下所示:
$./kafka-simple-consumer-shell.sh --broker-list brokerhost:9092 --topic myTopic --partition 0 --max-messages 1
但我看到很少有信息(大约5000分之2-4)被遗漏。有人能解释一下kafka-simple-consumer-shell.sh是如何阅读信息的吗。
我怀疑可能有一些消息是要到某个不同的分区,因为我只是从分区0读取,所以我不是每次都得到所有的消息。但我不知道如何检查有多少分区?其他分区的ID是什么?我试过用1,但没用。
有人能帮忙吗。
1条答案
按热度按时间ig9co6j11#
kafka-simple-consumer.sh
只需创建一个从一个分区读取消息的使用者。因此,您的命令只需在partition 0 of myTopic
从brokerhost:9092
. 如果分区1不在同一个代理中,它将不会像您所做的那样工作(有关更多信息,请查看github的代码)如果您可以访问zookeeper主机,那么只需使用
但是如果你不能访问zookeeper主机,我可以想到两种方法。
提供一个包含所有代理的列表作为参数,并尝试使用0到n之间的分区号。您可以提供多个代理来
--broker-list
格式为broker1:port2,broker2:port2,broker3:port3
. 然后您可以计算出整个集群中存在多少个分区,但仍然不知道哪个代理拥有哪些分区。手动检查每个代理的日志目录。检查
/tmp/kafka-logs
(如果您使用的是默认日志目录)。你会发现像这样的目录myTopic-0
,myTopic-1
, ... 格式为topic-partition#
. 您可以使用此命令手动检查哪个代理具有哪些分区。