结构化Spark流度量检索

gojuced7  于 2023-06-24  发布在  Apache
关注(0)|答案(1)|浏览(116)

我有一个使用结构化Spark流的应用程序,我想获得一些指标,如调度延迟,延迟等。通常,这样的指标可以在Spark UI Streaming选项卡中找到,但是,据我所知,结构化流不存在这样的功能。那么,我如何才能获得这些指标值呢?
现在,我尝试使用查询进度,但并非所有必需的指标都可以在结果中找到:

QueryProgress {
  "timestamp" : "2019-11-19T20:14:07.011Z",
  "batchId" : 1,
  "numInputRows" : 8,
  "inputRowsPerSecond" : 0.8429038036034138,
  "processedRowsPerSecond" : 1.1210762331838564,
  "durationMs" : {
    "addBatch" : 6902,
    "getBatch" : 1,
    "getEndOffset" : 0,
    "queryPlanning" : 81,
    "setOffsetRange" : 20,
    "triggerExecution" : 7136,
    "walCommit" : 41
  },
  "stateOperators" : [ {
    "numRowsTotal" : 2,
    "numRowsUpdated" : 2,
    "memoryUsedBytes" : 75415,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 400,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 17815
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[tweets]]",
    "startOffset" : {
      "tweets" : {
        "0" : 579
      }
    },
    "endOffset" : {
      "tweets" : {
        "0" : 587
      }
    },
    "numInputRows" : 8,
    "inputRowsPerSecond" : 0.8429038036034138,
    "processedRowsPerSecond" : 1.1210762331838564
  } ]
fd3cxomn

fd3cxomn1#

你是对的,这些指标在某些时候在结构化流中不可用**(直到Spark 3的某些版本,现在,有“结构化流”选项卡,在Spark 2中此选项卡不可用)**,但它们确实在Spark UI中可用,当您在“Streaming”选项卡中使用DStream s时。
如果您使用StreamingContext注册,也可以从StreamingListener获取它们,但您不能再使用SparkSession,因此只能使用StreamingQueryListener

**但是!**计算这些指标的所有信息都可以在json的durationMsMap中找到。

如果将该Map中的所有值相加,就会得到微批处理的总处理时间(或者称之为latency),单位为毫秒。
至于scheduling delay,如果我理解正确的话,你必须把triggerExecution阶段之前发生的所有事情加起来。但你必须做一些挖掘,以了解它之前发生了什么。
所以,你有这些操作:
addBatchgetBatchgetEndOffsetqueryPlanningsetOffsetRangetriggerExecutionwalCommit(顺便说一下,这些名称来自Spark 2,其中一些在Spark 3中更改)。
你必须把在triggerExecution之前发生的操作加起来,然后得到scheduling delay,就像我说的,但是你必须研究Spark的MicroBatchExecution类才能弄清楚。因此,这是scheduling delay的“类型”,基本上是在微批处理实际执行开始之前所使用的类型数量。
MicroBatchExecution类中搜索reportTimeTaken,您将看到与durationMsMap中相同的操作。现在,你只需要弄清楚他们的顺序。
P.S.我快速看了一下MicroBatchExecution,似乎在triggerExecution之后的唯一操作是addBatch,它属于Sink trait,负责将数据写入sink。因此,似乎scheduling delay可以通过将durationMs中除了triggerExecutionaddBatch之外的所有内容相加来计算。
另一种选择是从源代码构建Spark,在每个reportTimeTaken语句之后添加println语句,并在运行Spark结构化流作业时查看这些操作的顺序。

相关问题