我是Kafka的新人。尝试实现使用者和生产者类来发送和接收消息。需要配置 bootstrap.servers
对于这两个类,它是由 ,
. 例如,
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
由于应用程序将在集群的主节点上运行,因此它应该能够从中检索代理信息 ZooKeeper
就像Kafka的答案:从zookeeper获取代理主机。
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
但是,这个brokerinfo是json格式的,如下所示:{“jmx\u port”:-1,“timestamp”:“1428512949385”,“host”:“192.168.0.11”,“version”:1,“port”:9093}
在同一篇文章中,另一篇文章建议使用以下方法获取每个代理的连接字符串,并用逗号将它们连接在一起。
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
如果这个 Broker
班级来自 org.apache.kafka.common.requests.UpdateMetadataRequest.Broker
,它没有方法 createBroker
以及 connectionString
.
找到另一个类似的帖子,动态获取代理列表。但它没有说明如何从代理信息中获取属性,例如 host
以及 port
. 我可能可以为类似json的字符串编写一个解析器来提取它们,但我怀疑还有更多的kafka本地方法可以做到这一点。有什么建议吗?
编辑:我意识到 Broker
班级来自 kafka.cluster.Broker
. 还是没有办法 connectionstring()
.
1条答案
按热度按时间qeeaahzv1#
您可以使用zkutils检索集群中的所有代理信息,如下所示: