使用jmx监视kafka主题

i34xakig  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(452)

我用jmx来监控Kafka的主题。

val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi");

 val jmxc = JMXConnectorFactory.connect(url, null);
 val mbsc = jmxc.getMBeanServerConnection();
 val messageCountObj = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic");
 val messagesInPerSec = mbsc.getAttribute(messageCountObj,"MeanRate")

使用这段代码,我可以得到broker1上“mytopic”的平均速率。但是我有10个经纪人,我怎么能从我所有的经纪人那里得到“mytopic”的平均价格呢?
我已经试过了”service:jmx:rmi:///jndi/rmi://broker1:9393,经纪人2:9393,经纪人3:9393/jmxrmi“
出现错误:(

9vw9lbht

9vw9lbht1#

如果能这么简单就好了;)
没有办法像你说的那样做。您需要与每个代理建立单独的连接。
一个可能的解决方案是使用mbeanserver联合,它将为一个mbeanserver中的每个代理注册代理,因此如果您在broker1上这样做,您可以连接到 service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi 一次查询所有代理的统计信息,但是需要查询10个不同的objectname,查询每个objectname的值,然后自己计算平均值[java]伪代码:

ObjectName wildcard = new ObjectName("*:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic");
double totalRate = 0d;
int respondingBrokers = 0;
for(ObjectName on : mbsc.queryNames(wildcard, null)) {
   totalRate += (Double)mbsc.getAttribute(messageCountObj,"MeanRate");
   respondingBrokers++;
}
// Average rate of mean rates: totalRate/respondingBrokers

注意:没有异常处理,我假设rate类型是double。
您还可以创建并注册一个自定义mbean,用于计算联邦代理上的聚合平均值。
如果您是面向maven的,那么可以从这里构建opendmk。

相关问题