我有一个使用结构化Spark流的应用程序,我想获得一些指标,如调度延迟,延迟等。通常,这样的指标可以在Spark UI Streaming选项卡中找到,但是,据我所知,结构化流不存在这样的功能。那么,我如何才能获得这些指标值呢?
现在,我尝试使用查询进度,但并非所有必需的指标都可以在结果中找到:
QueryProgress {
"timestamp" : "2019-11-19T20:14:07.011Z",
"batchId" : 1,
"numInputRows" : 8,
"inputRowsPerSecond" : 0.8429038036034138,
"processedRowsPerSecond" : 1.1210762331838564,
"durationMs" : {
"addBatch" : 6902,
"getBatch" : 1,
"getEndOffset" : 0,
"queryPlanning" : 81,
"setOffsetRange" : 20,
"triggerExecution" : 7136,
"walCommit" : 41
},
"stateOperators" : [ {
"numRowsTotal" : 2,
"numRowsUpdated" : 2,
"memoryUsedBytes" : 75415,
"customMetrics" : {
"loadedMapCacheHitCount" : 400,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 17815
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[tweets]]",
"startOffset" : {
"tweets" : {
"0" : 579
}
},
"endOffset" : {
"tweets" : {
"0" : 587
}
},
"numInputRows" : 8,
"inputRowsPerSecond" : 0.8429038036034138,
"processedRowsPerSecond" : 1.1210762331838564
} ]
1条答案
按热度按时间fd3cxomn1#
你是对的,这些指标在某些时候在结构化流中不可用**(直到Spark 3的某些版本,现在,有“结构化流”选项卡,在Spark 2中此选项卡不可用)**,但它们确实在Spark UI中可用,当您在“Streaming”选项卡中使用
DStream
s时。如果您使用
StreamingContext
注册,也可以从StreamingListener
获取它们,但您不能再使用SparkSession
,因此只能使用StreamingQueryListener
。**但是!**计算这些指标的所有信息都可以在json的
durationMs
Map中找到。如果将该Map中的所有值相加,就会得到微批处理的总处理时间(或者称之为
latency
),单位为毫秒。至于
scheduling delay
,如果我理解正确的话,你必须把triggerExecution
阶段之前发生的所有事情加起来。但你必须做一些挖掘,以了解它之前发生了什么。所以,你有这些操作:
addBatch
,getBatch
,getEndOffset
,queryPlanning
,setOffsetRange
,triggerExecution
,walCommit
(顺便说一下,这些名称来自Spark 2,其中一些在Spark 3中更改)。你必须把在
triggerExecution
之前发生的操作加起来,然后得到scheduling delay
,就像我说的,但是你必须研究Spark的MicroBatchExecution
类才能弄清楚。因此,这是scheduling delay
的“类型”,基本上是在微批处理实际执行开始之前所使用的类型数量。在
MicroBatchExecution
类中搜索reportTimeTaken
,您将看到与durationMs
Map中相同的操作。现在,你只需要弄清楚他们的顺序。P.S.我快速看了一下
MicroBatchExecution
,似乎在triggerExecution
之后的唯一操作是addBatch
,它属于Sink
trait,负责将数据写入sink。因此,似乎scheduling delay
可以通过将durationMs
中除了triggerExecution
和addBatch
之外的所有内容相加来计算。另一种选择是从源代码构建Spark,在每个
reportTimeTaken
语句之后添加println
语句,并在运行Spark结构化流作业时查看这些操作的顺序。