我开始与汇合平台,需要运行zookeeper( zookeeper-server-start /etc/kafka/zookeeper.properties
)然后是Kafka( kafka-server-start /etc/kafka/server.properties
). 我正在写一个新贵脚本,应该运行Kafka和zookeeper。问题是Kafka应该阻止直到zookeeper准备好了(因为这取决于它),但我找不到一个可靠的方法来知道zookeeper什么时候准备好了。以下是在运行zookeeper服务器启动后在伪代码中的一些尝试:
使用硬编码块
sleep 5
在速度较慢的计算机上不能可靠地工作和/或等待的时间比需要的时间长。
检查端口2181上何时有东西(希望是zookeeper)正在运行
wait until $(echo stat | nc localhost ${port}) is not none
这似乎不起作用,因为它没有等待足够长的时间,Zookeeper接受Kafka连接。
检查日志
wait until specific string in zookeeper log is found
这是粗略的,甚至没有一个字符串也无法在错误中找到(例如“binding to port[…]”)。
有没有可靠的方法知道Zookeeper什么时候准备好接受Kafka的连接?否则,我将不得不使用1和2的组合。
3条答案
按热度按时间mm5n2pyu1#
你评论中的Kafka错误信息绝对是相关的:
致命[kafka服务器0],kafkaserver启动期间发生致命错误。准备关闭(kafka.server.kafkaserver)java.lang.runtimeexception:代理已在路径/brokers/ids/0上注册。这可能表示您已经配置了一个已在使用的代理,或者您关闭了此代理并以比zookeeper超时更快的速度重新启动它,因此它似乎正在重新注册。
这表明zookeeper已启动并运行,Kafka能够连接到它。如我所料,技术2足以验证zookeeper是否准备好接受连接。
相反,问题似乎在Kafka方面。它注册了一个zookeeper临时节点来表示开始的kafka代理。当客户端的zookeeper会话过期时,临时节点会自动删除(例如,进程终止,从而停止对zookeeper的心跳)。但是,这是基于超时。如果kafka代理快速重新启动,那么在重新启动之后,它会看到代表该代理的znode已经存在。对于新的进程启动,这看起来已经有一个代理在该路径上启动并注册。由于代理应该具有唯一的ID,因此它将中止。
等待超过zookeeper会话到期时间的一段时间是对此问题的适当响应。如有必要,您可以根据zookeeper管理员指南中的讨论,调整会话过期时间,使其更快地发生(参见对
tickTime
,minSessionTimeout
以及maxSessionTimeout
)但是,将会话过期时间调整得过快可能会导致客户端在正常操作期间遇到虚假的会话过期时间。我对Kafka知之甚少,但也许Kafka方面也可以做点什么。我知道有些管理工具(如apacheambari)会采取步骤来保证在资源调配时为每个代理分配一个唯一的id。
kgsdhlau2#
版本3.3.0中引入的confluent cli使使用单个命令启动所有服务变得非常容易:
有关更多详细信息,请参阅confluent platform quickstart文档。
laximzn53#
我发现使用计时器是不可靠的。第二种选择(等待端口)对我有效: