Properties properties = ...
// omitted for the sake of brevity
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(topic);
consumer.poll(Duration.ofSeconds(10L)); // or some time
AtomicLong count = new AtomicLong();
consumer.endOffsets(consumer.assignment()).forEach( (topicPartition, endOffsetOfPartition) -> {
count.addAndGet(endOffsetOfPartition);
});
// decrement in case of retention as pointed out by Mickael
consumer.beginningOffsets(consumer.assignment()).forEach( (topicPartition, startOffsetOfPartition) -> {
count.set(count.get() - startOffsetOfPartition);
}));
System.out.println(count.get());
1条答案
按热度按时间t40tm48m1#
以下应起作用:
获取每个分区的结束偏移量,并将每个分区的结束偏移量添加到计数中,因为主题中的消息数等于该主题的所有分区中的消息数。