我创建了一个实现 org.apache.kafka.common.metrics.KafkaMetric
像这样:
public class DatadogMetricTracker implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
System.out.println(configs);
}
@Override
public void init(List<KafkaMetric> metrics) {
System.out.println(metrics);
}
@Override
public void metricChange(KafkaMetric metric) {
System.out.println(metric.metricName().name() + ": " + metric.value() + " tags: " + metric.metricName().tags());
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
}
然后,当我设置Kafka道具时,我将该类注册为metric reporter: properties.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.myco.utils.DatadogMetricTracker");
当我开始消费时, configure
接到电话 init
,那么 metricChange
使用一批值都为0或-无穷大的度量调用一次,然后再也不会调用它。如何让我的公制记录器再次启动?
谢谢!
2条答案
按热度按时间dauxcl2d1#
我有几乎相似的代码。只是将值转储到日志文件中。与kafkaio有相同的问题,从beam 2.6读取kafka clients 1.0版本。我甚至尝试过设置“metrics.sample.window.ms”和“metrics.num.samples”这样的属性,但没有成功。我只看到0.0或-infinity/infinity的值。
bq3bfh9z2#
检查了代码,metricchange方法除了最初注册度量之外不会被调用。所以最好用别的方法。到目前为止,我将实现度量的定期读取和发布,因为在我们的例子中,prometheus将从我们的应用程序中获取度量。更重要的一点是,由于值已经在内部计算,所以将只使用gauge来填充普罗米修斯的值
它在jmx reporter的情况下工作,因为它们创建动态bean,当检查mbean的属性值时(通过jconsole或其他方法),然后调用获取度量值的实际方法,因此它在那里工作,下面是代码: