我将flink配置为向jmxreporter公开度量,正如这里所说的那样。我还配置了一个worker节点,以将其度量也公开给master节点,正如这里所说的那样。下面是系统的配置 conf/flink-conf.yaml
主节点和工作节点上的文件。flink日志文件表明它已连接到jmx服务器。然后打开visualvm并连接到jmx服务器,然后进入mbeans选项卡。我看到了jobmanager包。但是,我没有找到我创建的计数器和 Jmeter 。我还安装了jconsole插件。但是当我转到jconsole选项卡时,我看到一条消息: No JConsole plugin installed. To install a JConsole plugin click the Configure Plugins button and provide full path to the plugin file/directory
. 根据这个链接,我需要 WtJmxPlugin.jar
我不知道在哪里能找到的文件。
# Metrics Reporter on the MASTER node
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789-8790
metrics.reporter.jmx.interval: 30 SECONDS
# Metrics Reporter on the WORKER node
env.java.opts: -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.56.1
我有一个 RichMapFunction
这就是暴露指标。
public static class SensorTypeMapper
extends RichMapFunction<MqttSensor, Tuple2<CompositeKeySensorType, MqttSensor>> {
private static final long serialVersionUID = -4080196110995184486L;
private transient Counter counter;
private transient Meter meter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext().getMetricGroup().counter("counterSensorTypeMapper");
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext().getMetricGroup().meter("meterSensorTypeMapper",
new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public Tuple2<CompositeKeySensorType, MqttSensor> map(MqttSensor value) throws Exception {
this.meter.markEvent();
this.counter.inc();
// every sensor key: sensorId, sensorType, platformId, platformType, stationId
// Integer sensorId = value.getKey().f0;
String sensorType = value.getKey().f1;
Integer platformId = value.getKey().f2;
// String platformType = value.getKey().f3;
Integer stationId = value.getKey().f4;
CompositeKeySensorType compositeKey = new CompositeKeySensorType(stationId, platformId, sensorType);
return Tuple2.of(compositeKey, value);
}
}
暂无答案!
目前还没有任何答案,快来回答吧!