我在java中有一个流构建为(对一些变量和类进行了动画化):
Map<String, Object> props = new HashMap<>();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker:6667");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> events = builder.stream("my-topic");
events.foreach((key, value) -> {
CustomClass instance = new CustomClass(value);
for (AnotherCustomClass anotherInstance: someIterator) {
anotherInstance(instance);
}
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
为了Kafka 0.10.0.0
:
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '0.10.0.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.0'
我的问题如下:
我怎样才能进入 KafkaStreams streams.metrics
内部 foreach
循环?为了读取和/或打印处理过的消息
更一般地说:我如何测量处理的消息吞吐量?“已处理”是指这样的消息: anotherInstance(instance)
已评估
1条答案
按热度按时间lx0bsm1f1#
kafka streams通过jmx(java管理扩展)公开所有度量。您可以使用jconsole或visualvm检查这些度量。使用这些工具,您可以浏览所有度量并将其绘制成图表。
为了检查应用程序正在处理的消息数量,请查看该度量:
它告诉您所有任务中每秒处理的平均消息数。
Kafka流指标的完整列表可以在官方文档中找到。