计算存储在Kafka主题中的消息数

ne5o7dgx  于 2021-06-07  发布在  Kafka
关注(0)|答案(6)|浏览(458)

我使用的是kafka的0.9.0.0版本,我想计算一个主题中的消息数,而不使用管理脚本kafka-console-consumer.sh。
我已经尝试了答案java中的所有命令,如何在apachekafka中获取一个主题中的消息数,但是没有一个得到结果。有人能帮帮我吗?

nszi6y05

nszi6y051#

从技术上讲,您可以简单地使用主题中的所有消息并对它们进行计数:
例子:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*

然而 kafka.tools.GetOffsetShell 该方法将给出偏移量,而不是主题中消息的实际数量。这意味着,如果主题被压缩,那么如果您通过使用消息或读取偏移量来计算消息,您将得到两个不同的数字。
主题压缩:https://kafka.apache.org/documentation.html#design_compactionbasics

evrscar2

evrscar22#

您也可以使用awk和一个简单的循环来实现这一点

for i in `kafka-run-class kafka.tools.GetOffsetShell --broker-list broker:9092 --time -1 --topic topic_name| awk -F : '{print $3}'`; do sum=$(($sum+$i)); done
drkbr07n

drkbr07n3#

获取主题中的记录数

brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
brjng4g3

brjng4g34#

您可以尝试执行以下命令:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1

然后,将每个分区的所有计数相加。
更新:java实现

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("your_topic"));
    Set<TopicPartition> assignment;
    while ((assignment = consumer.assignment()).isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
    }
    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
    final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
    assert (endOffsets.size() == beginningOffsets.size());
    assert (endOffsets.keySet().equals(beginningOffsets.keySet()));

    Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
            TopicPartition tp = entry.getKey();
            Long beginningOffset = entry.getValue();
            Long endOffset = endOffsets.get(tp);
            return endOffset - beginningOffset;
        }).sum();
    System.out.println(totalCount);
}
qlckcl4x

qlckcl4x5#

您可以使用以下公式来汇总所有计数:

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
zu0ti5jz

zu0ti5jz6#

如果你不想为“原创”Kafka剧本的麻烦买单,还有Kafka。
基本思想是
使用每个分区的最后一条消息
将偏移相加(校正基于零的偏移)。
让我们开发这个。

kafkacat -C -b <broker> -t <topic> -o -1 -f '%p\t%o\n'

这将输出如下内容(加上stderr上的“到达分区结尾”通知):

0    77
1    75
2    78

现在, kafkacat 不终止,但一直等待新消息。我们可以通过添加一个超时来避免这种情况(选择一个足够大的值,这样就可以获得给定环境中的所有分区):

timeout --preserve-status 1 kafkacat <snip>

现在我们可以继续计算第二列(每个列增加1个)--但是如果在该超时间隔期间有新消息,我们可能会得到如下结果:

0    77
1    75
2    78
1    76

所以我们必须考虑到这一点,这是很容易做到的一点 awk :

timeout --preserve-status 1 kafkacat <snip> 2> /dev/null \
| awk '{lastOffsets[$1] = $2} END {count = 0; for (i in lastOffsets) { count += lastOffsets[i] + 1 }; print count}'

请注意,我们如何使用(哈希)Map来记住每个分区最后一次看到的偏移量,直到触发超时,然后在数组上循环以计算总和。

相关问题