使用java查找Kafka主题中的消息数

umuewwlo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(489)

我正在使用kafka utils测试一个基于kafka的消息传递系统。我想在不使用kafka-console-consumer.sh脚本的情况下找出特定主题中的消息数。我似乎找不到一个基于kafkatestutils的方法或者java中的任何方法来帮助我实现这一点。其他类似问题的答案对我都没有帮助。

t40tm48m

t40tm48m1#

以下应起作用:

Properties properties = ...
// omitted for the sake of brevity
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(topic);
consumer.poll(Duration.ofSeconds(10L)); // or some time

AtomicLong count = new AtomicLong();
consumer.endOffsets(consumer.assignment()).forEach( (topicPartition, endOffsetOfPartition) -> {
    count.addAndGet(endOffsetOfPartition);
});

// decrement in case of retention as pointed out by Mickael
consumer.beginningOffsets(consumer.assignment()).forEach( (topicPartition, startOffsetOfPartition) -> {
    count.set(count.get() - startOffsetOfPartition);
}));

System.out.println(count.get());

获取每个分区的结束偏移量,并将每个分区的结束偏移量添加到计数中,因为主题中的消息数等于该主题的所有分区中的消息数。

相关问题