如何将flink用户自定义度量导出到prometheus&grafana

kuhbmx9i  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(766)

我正在尝试根据本教程创建自定义度量变量
通过提供的示例代码,我可以得到事件和直方图。
我不明白普罗米修斯和格拉法纳是怎么使用这个标识符的。我还试图修改一点示例代码,但是度量不再起作用。
另外,我只能访问系统度量,但不能访问我自己的度量。
我的问题是:
如何访问我创建的计数器?例如counter1
metricgroup到底是什么?
例如,我想从一个输入流中检测一个模式,在度量中这样做更合理,或者只将结果输出到一个timeseries数据库,比如influxdb?
提前谢谢。
这是map函数

  1. class FlinkMetricsExposingMapFunction extends RichMapFunction<SensorReading, SensorReading> {
  2. private static final long serialVersionUID = 1L;
  3. private transient Counter eventCounter;
  4. private transient Counter customCounter1;
  5. private transient Counter customCounter2;
  6. @Override
  7. public void open(Configuration parameters) {
  8. eventCounter = getRuntimeContext()
  9. .getMetricGroup().counter("events");
  10. customCounter1 = getRuntimeContext()
  11. .getMetricGroup()
  12. .addGroup("customCounterKey", "mod2")
  13. .counter("counter1");
  14. customCounter2 = getRuntimeContext()
  15. .getMetricGroup()
  16. .addGroup("customCounterKey", "mod5")
  17. .counter("counter2");
  18. // meter = getRuntimeContext().getMetricGroup().meter("eventMeter", new DropwizardMeterWrapper(dropwizardMeter));
  19. }
  20. @Override
  21. public SensorReading map(SensorReading value) {
  22. eventCounter.inc();
  23. if (value.getCurrTimestamp() % 2 == 0)
  24. customCounter1.inc();
  25. if (value.getCurrTimestamp() % 5 == 0)
  26. customCounter2.inc();
  27. if (value.getCurrTimestamp() % 2 == 0 && value.getCurrTimestamp() % 5 == 0)
  28. customCounter1.dec();
  29. return value;
  30. }
  31. }

作业示例:

  1. env
  2. .addSource(new SimpleSensorReadingGenerator())
  3. .name(SimpleSensorReadingGenerator.class.getSimpleName())
  4. .map(new FlinkMetricsExposingMapFunction())
  5. .name(FlinkMetricsExposingMapFunction.class.getSimpleName())
  6. .print()
  7. .name(DataStreamSink.class.getSimpleName());

更新

grafana access flink metrics的屏幕截图:

flink-config.yaml文件

  1. FROM flink:1.9.0
  2. RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
  3. echo "metrics.latency.interval: 1000" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
  4. echo "metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
  5. mv $FLINK_HOME/opt/flink-metrics-prometheus-*.jar $FLINK_HOME/lib
  6. COPY --from=builder /home/gradle/build/libs/*.jar $FLINK_HOME/lib/

教程中的默认Map函数:

  1. @Override
  2. public void open(Configuration parameters) {
  3. eventCounter = getRuntimeContext().getMetricGroup().counter("events");
  4. valueHistogram = getRuntimeContext()
  5. .getMetricGroup()
  6. .histogram("value_histogram", new DescriptiveStatisticsHistogram(10_000_000));
  7. }
x7yiwoj4

x7yiwoj41#

可以通过访问您创建的计数器 <system-scope>. customCounterKey.mod2.counter1 . <system-scope> 在flink-conf.yaml中定义。如果没有定义,则默认值为 <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index> .
度量组主要定义度量名称的层次结构。根据文档,度量组是度量的命名容器。它由3部分(作用域)组成:系统作用域(在flink-conf.yaml中定义)、用户作用域(无论您在中定义了什么) addGroup() )以及度量名称。
这取决于你想测量什么。对于所有你能检测到的计数器, Jmeter 或 Jmeter ,我会去的指标。如果涉及到直方图,你应该有一个仔细看看你从Flink得到什么,如果你使用普罗米修斯记者。flink概括了所有不同的度量框架——在prometheus中实现Historogram的方式与在graphite中实现Historogram的方式不同。桶的定义是由flink给出的,据我所知是不能更改的(尽管有一些关联魔法)。
所有这些在这里都有更详细的描述:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#registering-指标
希望有帮助。

相关问题