java—尝试通过jmx编程访问kafka度量时出错

thtygnil  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(450)

我只是从一个简单的例子开始阅读jmx度量,并使用heapmemoryusage、cputime等简单值来理解它。我需要尝试访问kafka服务器/消费者指标,我可以看到它在fetcherlagmetrics consumerlag下的jconsole应用程序中是一个键。但是程序错误如下:

javax.management.InstanceNotFoundException: kafka.consumer:type=consumer- 
  fetch-manager-metrics

这告诉我,使用者fetch管理器度量是一个问题,因为它甚至在jconsole中也不存在。我把它改成了下面的问题&仍然是同一个问题:

consumerBean = jmxCon.getMBeanServerConnection().getAttribute(new 
   ObjectName("kafka.server:type=FetcherLagMetrics"),"ConsumerLag");
            cd = (CompositeData) consumerBean;

尝试访问这些值的代码如下所示:

jmxCon.getMBeanServerConnection().invoke(new 
         ObjectName("java.lang:type=Memory"), "gc", null, null);

        for (int i = 0; i < 100; i++) {

            //get an instance of the HeapMemoryUsage Mbean
            memoryMbean = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage");
            cd = (CompositeData) memoryMbean;

            //get an instance of the OperatingSystem Mbean
            osMbean = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("java.lang:type=OperatingSystem"),"ProcessCpuTime");

            //get an instance of the kafka metrics Mbean
            consumerBean = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics"),"MaxLag");
            cd = (CompositeData) consumerBean;

            consumerBeanII = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("kafka.server:type=FetcherLagMetrics,name=ConsumerLag"),"Lag");

            System.out.println("Used memory: " + " " + cd.get("MaxLag") + " Used cpu: " + consumerBean); //print memory usage

            tempMemory = tempMemory + Long.parseLong(cd.get("used").toString());
            Thread.sleep(1000); //delay for one second

        }

它在consumerbean=……行失败。有人能解释或提供使用jmx/jmi实现访问kafka度量的正确方法吗?

oxalkeyp

oxalkeyp1#

我不确定您的kafka版本,但是当我使用jconsole查看我的版本(v1.1.0)时,没有您描述的用于消费滞后的bean。因此,我认为您的jmx查询可能会失败。
相反,我相信这类信息被移动到kafka管理api接口。所以如果你不想用,你现在必须用这个来获得延迟设置 kafka-consumer-groups.sh 命令行实用程序。
对于我们来说,我们只需要这些数据来监控,所以我们使用https://github.com/danielqsj/kafka_exporter 为普罗米修斯获取信息。

相关问题