如何将flink暴露在visualvm上的jmx服务器上的meter和counter度量可视化?

7fhtutme  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(373)

我将flink配置为向jmxreporter公开度量,正如这里所说的那样。我还配置了一个worker节点,以将其度量也公开给master节点,正如这里所说的那样。下面是系统的配置 conf/flink-conf.yaml 主节点和工作节点上的文件。flink日志文件表明它已连接到jmx服务器。然后打开visualvm并连接到jmx服务器,然后进入mbeans选项卡。我看到了jobmanager包。但是,我没有找到我创建的计数器和 Jmeter 。我还安装了jconsole插件。但是当我转到jconsole选项卡时,我看到一条消息: No JConsole plugin installed. To install a JConsole plugin click the Configure Plugins button and provide full path to the plugin file/directory . 根据这个链接,我需要 WtJmxPlugin.jar 我不知道在哪里能找到的文件。


# Metrics Reporter on the MASTER node

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789-8790
metrics.reporter.jmx.interval: 30 SECONDS

# Metrics Reporter on the WORKER node

env.java.opts: -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.56.1

我有一个 RichMapFunction 这就是暴露指标。

public static class SensorTypeMapper
        extends RichMapFunction<MqttSensor, Tuple2<CompositeKeySensorType, MqttSensor>> {
    private static final long serialVersionUID = -4080196110995184486L;

    private transient Counter counter;
    private transient Meter meter;

    @Override
    public void open(Configuration config) {
        this.counter = getRuntimeContext().getMetricGroup().counter("counterSensorTypeMapper");

        com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
        this.meter = getRuntimeContext().getMetricGroup().meter("meterSensorTypeMapper",
                new DropwizardMeterWrapper(dropwizardMeter));
    }

    @Override
    public Tuple2<CompositeKeySensorType, MqttSensor> map(MqttSensor value) throws Exception {
        this.meter.markEvent();
        this.counter.inc();
        // every sensor key: sensorId, sensorType, platformId, platformType, stationId
        // Integer sensorId = value.getKey().f0;
        String sensorType = value.getKey().f1;
        Integer platformId = value.getKey().f2;
        // String platformType = value.getKey().f3;
        Integer stationId = value.getKey().f4;
        CompositeKeySensorType compositeKey = new CompositeKeySensorType(stationId, platformId, sensorType);
        return Tuple2.of(compositeKey, value);
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题