如何在java程序中获得kafka消费滞后

jfgube3f  于 2021-06-07  发布在  Kafka
关注(0)|答案(6)|浏览(460)

我编写了一个java程序来使用Kafka的信息。我想监控消费滞后,如何通过java获得它?
顺便说一句,我使用:

<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>

提前谢谢。

rryofs0p

rryofs0p1#

如果您不想在项目中包含kafka(和scala)依赖项,可以使用下面的类。它只使用kafka客户机依赖项。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;

public class KafkaConsumerMonitor {

    public static class PartionOffsets {
        private long endOffset;
        private long currentOffset;
        private int partion;
        private String topic;

        public PartionOffsets(long endOffset, long currentOffset, int partion, String topic) {
            this.endOffset = endOffset;
            this.currentOffset = currentOffset;
            this.partion = partion;
            this.topic = topic;
        }

        public long getEndOffset() {
            return endOffset;
        }

        public long getCurrentOffset() {
            return currentOffset;
        }

        public int getPartion() {
            return partion;
        }

        public String getTopic() {
            return topic;
        }
    }

    private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString();

    public Map<TopicPartition, PartionOffsets> getConsumerGroupOffsets(String host, String topic, String groupId) {
        Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);

        KafkaConsumer consumer = createNewConsumer(groupId, host);

        BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
            throw new IllegalStateException();
        };

        Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet()
                .stream()
                .collect(Collectors.toMap(
                        entry -> (entry.getKey()),
                        entry -> {
                            OffsetAndMetadata committed = consumer.committed(entry.getKey());
                            return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey().partition(), topic);
                        }, mergeFunction));

        return result;
    }

    public Map<TopicPartition, Long> getLogEndOffset(String topic, String host) {
        Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap<>();
        KafkaConsumer<?, ?> consumer = createNewConsumer(monitoringConsumerGroupID, host);
        List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList());
        consumer.assign(topicPartitions);
        consumer.seekToEnd(topicPartitions);
        topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition)));
        consumer.close();
        return endOffsets;
    }

    private static KafkaConsumer<?, ?> createNewConsumer(String groupId, String host) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new KafkaConsumer<>(properties);
    }
}
ibps3vxo

ibps3vxo2#

我使用spring作为我的api。使用下面的代码,您可以通过java获得度量。

@Component
public class Receiver {

private static final Logger LOGGER =
      LoggerFactory.getLogger(Receiver.class);

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  public void testlag() {
      for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
                .getListenerContainers()) {
          Map<String, Map<MetricName, ? extends Metric>> metrics = messageListenerContainer.metrics();
          metrics.forEach( (clientid, metricMap) ->{
              System.out.println("------------------------For client id : "+clientid);
              metricMap.forEach((metricName,metricValue)->{
                  //if(metricName.name().contains("lag"))
                  System.out.println("------------Metric name: "+metricName.name()+"-----------Metric value: "+metricValue.metricValue());
              });
          });
            }
  }
w8rqjzmb

w8rqjzmb3#

尝试使用adminclient#ListGroupOffset(groupid)检索与使用者组关联的所有主题分区的偏移量。例如:

AdminClient client = AdminClient.createSimplePlaintext("localhost:9092");
Map<TopicPartition, Object> offsets = JavaConversions.asJavaMap(
    client.listGroupOffsets("groupID"));
Long offset = (Long) offsets.get(new TopicPartition("topic", 0));
...

编辑:
上面的代码片段展示了如何获取给定分区的提交偏移量。下面的代码显示了如何检索分区的leo。

public long getLogEndOffset(TopicPartition tp) {
    KafkaConsumer consumer = createNewConsumer();
    Collections.singletonList(tp);
    consumer.assign(Collections.singletonList(tp));
    consumer.seekToEnd(Collections.singletonList(tp));
    return consumer.position(tp);
}

private KafkaConsumer<String, String> createNewConsumer() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    return new KafkaConsumer(properties);
}

调用 getLogEndOffset 返回给定分区的leo,然后从中减去提交的偏移量,结果就是延迟。

zdwk9cvp

zdwk9cvp4#

供你参考,我用下面的代码完成了。基本上,您必须通过计算当前提交偏移量和结束偏移量之间的增量来手动计算每个主题分区的延迟。

private static Map<TopicPartition, Long> lagOf(String brokers, String groupId) {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
    try (AdminClient client = AdminClient.create(props)) {
        ListConsumerGroupOffsetsResult currentOffsets = client.listConsumerGroupOffsets(groupId);

        try {
            // get current offsets of consuming topic-partitions
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = currentOffsets.partitionsToOffsetAndMetadata()
                    .get(3, TimeUnit.SECONDS);
            final Map<TopicPartition, Long> result = new HashMap<>();
            doWithKafkaConsumer(groupId, brokers, (c) -> {
                // get latest offsets of consuming topic-partitions
                // lag = latest_offset - current_offset
                Map<TopicPartition, Long> endOffsets = c.endOffsets(consumedOffsets.keySet());
                result.putAll(endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                        entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())));
            });
            return result;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("", e);
            return Collections.emptyMap();
        }
    }
}

public static void doWithKafkaConsumer(String groupId, String brokers,
        Consumer<KafkaConsumer<String, String>> consumerRunner) {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
        consumerRunner.accept(consumer);
    }
}

请注意,一个消费者组可能同时消费多个主题,因此如果需要获得每个主题的滞后时间,则必须按主题对结果进行分组和聚合。

Map<TopicPartition, Long> lags = lagOf(brokers, group);
    Map<String, Long> topicLag = new HashMap<>();
    lags.forEach((tp, lag) -> {
        topicLag.compute(tp.topic(), (k, v) -> v == null ? lag : v + lag);
    });
eagi6jfj

eagi6jfj5#

运行此独立代码(依赖kafka-clients-2.6.0.jar)

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

public class CosumerGroupLag {

static String host = "localhost:9092";
static String topic = "topic02";
static String groupId = "test-group";

public static void main(String... vj) {
    CosumerGroupLag cgl = new CosumerGroupLag();

    while (true) {
        Map<TopicPartition, PartionOffsets> lag = cgl.getConsumerGroupOffsets(host, topic, groupId);
        System.out.println("$$LAG = " + lag);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString();

public Map<TopicPartition, PartionOffsets> getConsumerGroupOffsets(String host, String topic, String groupId) {
    Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);

    Set<TopicPartition> topicPartitions = new HashSet<>();
    for (Entry<TopicPartition, Long> s : logEndOffset.entrySet()) {
        topicPartitions.add(s.getKey());
    }

    KafkaConsumer<String, Object> consumer = createNewConsumer(groupId, host);
    Map<TopicPartition, OffsetAndMetadata> comittedOffsetMeta = consumer.committed(topicPartitions);

    BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
        throw new IllegalStateException();
    };
    Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet().stream()
            .collect(Collectors.toMap(entry -> (entry.getKey()), entry -> {
                OffsetAndMetadata committed = comittedOffsetMeta.get(entry.getKey());
                long currentOffset = 0;
                if(committed != null) { //committed offset will be null for unknown consumer groups
                    currentOffset = committed.offset();
                }
                return new PartionOffsets(entry.getValue(), currentOffset, entry.getKey().partition(), topic);
            }, mergeFunction));

    return result;
}

public Map<TopicPartition, Long> getLogEndOffset(String topic, String host) {
    Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap<>();
    KafkaConsumer<?, ?> consumer = createNewConsumer(monitoringConsumerGroupID, host);
    List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
    List<TopicPartition> topicPartitions = partitionInfoList.stream()
            .map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList());
    consumer.assign(topicPartitions);
    consumer.seekToEnd(topicPartitions);
    topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition)));
    consumer.close();
    return endOffsets;
}

private static KafkaConsumer<String, Object> createNewConsumer(String groupId, String host) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new KafkaConsumer<>(properties);
}

private static class PartionOffsets {
    private long lag;
    private long timestamp = System.currentTimeMillis();
    private long endOffset;
    private long currentOffset;
    private int partion;
    private String topic;

    public PartionOffsets(long endOffset, long currentOffset, int partion, String topic) {
        this.endOffset = endOffset;
        this.currentOffset = currentOffset;
        this.partion = partion;
        this.topic = topic;
        this.lag = endOffset - currentOffset;
    }

    @Override
    public String toString() {
        return "PartionOffsets [lag=" + lag + ", timestamp=" + timestamp + ", endOffset=" + endOffset
                + ", currentOffset=" + currentOffset + ", partion=" + partion + ", topic=" + topic + "]";
    }

}
}
guykilcj

guykilcj6#

我个人直接从我的消费者那里查询jmx信息。我只使用java,所以jmx bean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max 都有。
如果jolokia在类路径中,您可以使用get-on检索值 /jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max 把所有结果集中在一个地方。
还有一个很容易配置的burrow,但是它有点过时(如果我记得很清楚的话,它不适用于0.10)。

相关问题