我使用spark结构化流媒体读取Kafka主题的记录;我打算统计spark中每个“微批次”收到的记录数 readstream
这是一个片段:
val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.load()
我从文件中了解到,Kafka将被懒惰地评估,当一个 streamingQuery
开始(下一步),当它被评估时,它持有一个微批。所以,我想 groupBy
主题后接 count
应该有用。
这样地:
val counter = kafka_df
.groupBy("topic")
.count()
现在要评估所有这些,我们需要一个streaminquery,比方说,一个console sink查询来在控制台上打印它。这就是问题所在。一个关于 aggregate
Dataframe,例如 kafka_df
仅适用于 outputMode
完成/更新而不是附加。
这实际上意味着streamingquery报告的计数是累积的。
这样地:
val counter_json = counter.toJSON //to jsonify
val count_query = counter_json
.writeStream.outputMode("update")
.format("console")
.start() // kicks of lazy evaluation
.awaitTermination()
在受控设置中,其中:
实际发表记录:1500
实际接收微批次:3
实际接收记录:1500
每个微批的计数应该是500,所以我希望(希望)查询打印到控制台:
主题:测试计数
计数:500
主题:测试计数
计数:500
主题:测试计数
计数:500
但事实并非如此。它实际上打印了:
主题:测试计数
计数:500
主题:测试计数
count:1000
主题:测试计数
计数:1500
我理解这是因为“outputmode”完成/更新(累计)
我的问题是:有没有可能准确地得到每一个微批量是spark kafka结构化流媒体的计数?
从文档中,我发现了水印方法(支持append):
val windowedCounts = kafka_df
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"topic")
.count()
val console_query = windowedCounts
.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
但结果是 console_query
是不准确的,而且看起来很离谱。
热释光;博士-任何关于准确计算spark kafka微批次记录的想法都将不胜感激。
2条答案
按热度按时间1tu0hz3e1#
如果要使用kafka在结构化流式处理应用程序中使用每个触发器仅处理特定数量的记录,请使用
maxOffsetsPerTrigger
```val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.option("maxOffsetsPerTrigger", 500)
.load()
6bc51xsx2#
“tl;博士-任何关于准确计算spark kafka微批次记录的想法都将不胜感激。”
您可以使用
StreamingQueryListener
(scaladocs)。这允许您打印出从订阅的Kafka主题接收到的确切行数。这个
onQueryProgress
api在每个微批处理过程中都会被调用,并且在查询中包含许多有用的元信息。如果没有数据流入查询,则每隔10秒调用onqueryprogress。下面是一个简单的例子,可以打印出输入消息的数量。如果要验证结构化流式查询的性能,通常最好关注以下两个指标:
queryProgress.progress.inputRowsPerSecond
queryProgress.progress.processedRowsPerSecond
如果输入高于处理的值,您可以增加作业的资源或减少最大限制(通过减少readstream选项maxoffsetspertrigger)。如果“已处理”更高,则可能需要增加此限制。