kafka消费者自定义metricreporter未接收度量

k7fdbhmy  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(535)

我创建了一个实现 org.apache.kafka.common.metrics.KafkaMetric 像这样:

public class DatadogMetricTracker implements MetricsReporter {

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println(configs);
    }

    @Override
    public void init(List<KafkaMetric> metrics) {
        System.out.println(metrics);
    }

    @Override
    public void metricChange(KafkaMetric metric) {
        System.out.println(metric.metricName().name() + ": " + metric.value() + " tags: " + metric.metricName().tags());
    }

    @Override
    public void metricRemoval(KafkaMetric metric) {

    }

    @Override
    public void close() {

    }

}

然后,当我设置Kafka道具时,我将该类注册为metric reporter: properties.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.myco.utils.DatadogMetricTracker"); 当我开始消费时, configure 接到电话 init ,那么 metricChange 使用一批值都为0或-无穷大的度量调用一次,然后再也不会调用它。如何让我的公制记录器再次启动?
谢谢!

dauxcl2d

dauxcl2d1#

我有几乎相似的代码。只是将值转储到日志文件中。与kafkaio有相同的问题,从beam 2.6读取kafka clients 1.0版本。我甚至尝试过设置“metrics.sample.window.ms”和“metrics.num.samples”这样的属性,但没有成功。我只看到0.0或-infinity/infinity的值。

bq3bfh9z

bq3bfh9z2#

检查了代码,metricchange方法除了最初注册度量之外不会被调用。所以最好用别的方法。到目前为止,我将实现度量的定期读取和发布,因为在我们的例子中,prometheus将从我们的应用程序中获取度量。更重要的一点是,由于值已经在内部计算,所以将只使用gauge来填充普罗米修斯的值
它在jmx reporter的情况下工作,因为它们创建动态bean,当检查mbean的属性值时(通过jconsole或其他方法),然后调用获取度量值的实际方法,因此它在那里工作,下面是代码:

public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
            if (this.metrics.containsKey(name))
                return this.metrics.get(name).metricValue();
            else
                throw new AttributeNotFoundException("Could not find attribute " + name);
        }

相关问题