测量kafka流中的处理吞吐量

jhiyze9q  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(389)

我在java中有一个流构建为(对一些变量和类进行了动画化):

Map<String, Object> props = new HashMap<>();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker:6667");
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> events = builder.stream("my-topic");

    events.foreach((key, value) -> {
        CustomClass instance = new CustomClass(value);
        for (AnotherCustomClass anotherInstance: someIterator) {
            anotherInstance(instance);
        }
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

为了Kafka 0.10.0.0 :

compile group: 'org.apache.kafka', name: 'kafka-streams', version: '0.10.0.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.0'

我的问题如下:
我怎样才能进入 KafkaStreams streams.metrics 内部 foreach 循环?为了读取和/或打印处理过的消息
更一般地说:我如何测量处理的消息吞吐量?“已处理”是指这样的消息: anotherInstance(instance) 已评估

lx0bsm1f

lx0bsm1f1#

kafka streams通过jmx(java管理扩展)公开所有度量。您可以使用jconsole或visualvm检查这些度量。使用这些工具,您可以浏览所有度量并将其绘制成图表。
为了检查应用程序正在处理的消息数量,请查看该度量:

MBean: kafka.streams:type=stream-metrics,thread.client-id=[threadId]
Attribute: process-rate

它告诉您所有任务中每秒处理的平均消息数。
Kafka流指标的完整列表可以在官方文档中找到。

相关问题