如何为事务性消费者监控kafka消费者滞后

mkshixfv  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(458)

有一个有用的指标来监测Kafka消费滞后 spring-kafka 打电话 kafka_consumer_records_lag_max_records . 但这个指标对事务性消费者不起作用。是否有特定的配置来为事务使用者启用滞后度量?
我已将我的消费者组配置为使用隔离级别 read_committed 度量包含 kafka_consumer_records_lag_max_records{client_id="listener-1",} -Inf

3j86kqsm

3j86kqsm1#

你说“不行”是什么意思?我刚测试过,效果很好。。。

@SpringBootApplication
public class So56540759Application {

    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext context = SpringApplication.run(So56540759Application.class, args);
        System.in.read();
        context.close();
    }

    private MetricName lagNow;

    private MetricName lagMax;

    @Autowired
    private MeterRegistry meters;

    @KafkaListener(id = "so56540759", topics = "so56540759", clientIdPrefix = "so56540759",
            properties = "max.poll.records=1")
    public void listen(String in, Consumer<?, ?> consumer) {
        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
        Metric currentLag = metrics.get(this.lagNow);
        Metric maxLag = metrics.get(this.lagMax);
        System.out.println(in
                + " lag " + currentLag.metricName().name() + ":" + currentLag.metricValue()
                + " max " + maxLag.metricName().name() + ":" + maxLag.metricValue());
        Gauge gauge = meters.get("kafka.consumer.records.lag.max").gauge();
        System.out.println("lag-max in Micrometer: " + gauge.value());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56540759", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        Set<String> tags = new HashSet<>();
        FetcherMetricsRegistry registry = new FetcherMetricsRegistry(tags, "consumer");
        MetricNameTemplate temp = registry.recordsLagMax;
        this.lagMax = new MetricName(temp.name(), temp.group(), temp.description(),
                Collections.singletonMap("client-id", "so56540759-0"));
        temp = registry.partitionRecordsLag;
        Map<String, String> tagsMap = new LinkedHashMap<>();
        tagsMap.put("client-id", "so56540759-0");
        tagsMap.put("topic", "so56540759");
        tagsMap.put("partition", "0");
        this.lagNow = new MetricName(temp.name(), temp.group(), temp.description(), tagsMap);

        return args -> IntStream.range(0, 10).forEach(i -> template.send("so56540759", "foo" + i));
    }

}
2019-06-11 12:13:45.803  INFO 32187 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = so56540759-0
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = so56540759
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_committed
    ...
    transaction.timeout.ms = 60000
    ...

2019-06-11 12:13:45.840  INFO 32187 --- [o56540759-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so56540759-0]
foo0 lag records-lag:9.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo1 lag records-lag:8.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo2 lag records-lag:7.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo3 lag records-lag:6.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo4 lag records-lag:5.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo5 lag records-lag:4.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo6 lag records-lag:3.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo7 lag records-lag:2.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo8 lag records-lag:1.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo9 lag records-lag:0.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0

编辑2
我真的看到了 -Infinity 在mbean中,如果事务超时,也就是说,如果监听器在我的测试中没有在60秒内退出。

相关问题