在开始生产和消费作业之前,我要确保kafka服务器是否正在运行。它在windows环境下,下面是我的kafka服务器在eclipse中的代码。。。
Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();
在这种情况下 if (server != null)
这是不够的,因为它总是正确的。所以有没有办法知道,我的Kafka服务器正在运行,并准备生产。我有必要检查这个,因为它会导致一些起始数据包丢失。
9条答案
按热度按时间fcipmucu1#
好的选择是在开始生成或使用消息之前使用adminclient,如下所示
8yparm6h2#
如果服务器正在运行,您可以使用下面的代码检查可用的代理。
cygmwpex3#
保罗的回答很好,从经纪人的Angular 来看,Kafka和zk实际上是如何合作的。
我想说,检查kafka服务器是否正在运行的另一个简单选项是创建一个指向cluste的简单kafkaconsumer并尝试一些操作,例如listtopics()。如果kafka服务器没有运行,您将得到timeoutexception,然后可以使用
try-catch
判决。oxcyiej74#
首先,您需要创建adminclient bean:
然后,您可以使用以下脚本:
此脚本将每秒检查使用者组的状态,直到状态稳定为止。因为所有使用者都分配到主题分区,所以您可以断定服务器正在运行并准备就绪。
ej83mcc05#
我使用了adminclient api。
jq6vz3qz6#
我发现了一个事件
OnError
在合流Kafka中:及其代码文档:
yr9zkbsy7#
所有Kafka经纪人都必须被指派
broker.id
. 启动时,代理将在zookeeper中创建一个临时节点,路径为/broker/ids/$id
. 由于节点是短暂的,一旦代理断开连接(例如关闭),它就会被删除。您可以查看临时代理节点的列表,如下所示:
echo dump | nc localhost 2181 | grep brokers
zookeeper客户端界面公开了许多命令;dump
列出集群的所有会话和临时节点。注意,以上假设:
您正在默认端口上运行zookeeper(
2181
)在localhost
,然后呢localhost
是集群的领导者你的
zookeeper.connect
kafka config没有为kafka集群指定chroot env,即它只是host:port
而不是host:port/path
bmp9r5qi8#
您可以在机器上安装kafkacat工具
例如,在ubuntu上,您可以使用
一旦安装了kafkacat,您就可以使用下面的命令来连接它
用机器ip替换
可以替换为kafka正在运行的端口。通常是9092
运行上述命令后,如果kafkacat能够建立连接,则表示kafka已启动并正在运行
ars1skjm9#
对于linux,“ps aux | grep kafka”查看结果中是否显示kafka属性。e、 g./path/to/kafka/server.properties