我用jmx来监控Kafka的主题。
val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi");
val jmxc = JMXConnectorFactory.connect(url, null);
val mbsc = jmxc.getMBeanServerConnection();
val messageCountObj = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=mytopic");
val messagesInPerSec = mbsc.getAttribute(messageCountObj,"MeanRate")
使用这段代码,我可以得到broker1上“mytopic”的平均速率。但是我有10个经纪人,我怎么能从我所有的经纪人那里得到“mytopic”的平均价格呢?
我已经试过了”service:jmx:rmi:///jndi/rmi://broker1:9393,经纪人2:9393,经纪人3:9393/jmxrmi“
出现错误:(
1条答案
按热度按时间9vw9lbht1#
如果能这么简单就好了;)
没有办法像你说的那样做。您需要与每个代理建立单独的连接。
一个可能的解决方案是使用mbeanserver联合,它将为一个mbeanserver中的每个代理注册代理,因此如果您在broker1上这样做,您可以连接到
service:jmx:rmi:///jndi/rmi://broker1:9393/jmxrmi
一次查询所有代理的统计信息,但是需要查询10个不同的objectname,查询每个objectname的值,然后自己计算平均值[java]伪代码:注意:没有异常处理,我假设rate类型是double。
您还可以创建并注册一个自定义mbean,用于计算联邦代理上的聚合平均值。
如果您是面向maven的,那么可以从这里构建opendmk。