如何从Spark计划(org.apache.spark.sql.execution.SparkPlan)中提取文件扫描位置(或文件源扫描执行对象)?

hc2pp10m  于 2023-02-09  发布在  Apache
关注(0)|答案(1)|浏览(98)

我想获得一个spark计划引用的所有文件路径,这样我就可以将它们与一组预定义的路径进行比较。
在Spark计划上进行简单的字符串搜索可能会遇到问题,因为我发现计划中的路径被截断了。(配置Spark.sql.最大元数据字符串长度和Spark.debug.最大到字符串字段,我还尝试使用.treeString(verbose=true))。('formated ')给出了未截断的输出。但我希望从org.apache.spark.sql.execution.SparkPlan获取它。
我也试过从Spark Plan中提取FileScans,这个article对以下代码有帮助:

def full_file_meta(f: FileSourceScanExec) = {
    val metadataEntries = f.metadata.toSeq.sorted.flatMap {
      case (key, value) if Set(
          "Location", "PartitionCount",
          "PartitionFilters", "PushedFilters"
      ).contains(key) =>
        Some(key + ": " + value.toString)
      case other => None
    }

    val metadataStr = metadataEntries.mkString("[\n  ", ",\n  ", "\n]")
    s"${f.nodeNamePrefix}${f.nodeName}$metadataStr"

}

val ep = data.queryExecution.executedPlan

print(ep.flatMap {
    case f: FileSourceScanExec => full_file_meta(f)::Nil
    case other => Nil
}.mkString(",\n"))

然而,这在简单计划的情况下是有帮助的(例如“select * from path”)。当连接出现在图片中时,它不起作用(查询,例如“select * from path1 join path2”)。在后一种情况下,由于某种原因,没有FileSourceScanExec对象作为FlatMap的键存在。实际上,只有一个键,即org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec,并且它有一个空的子列表。

lx0bsm1f

lx0bsm1f1#

要从Spark计划中提取文件路径,可以尝试以下方法:
1.遍历Spark计划树并在节点中查找FileSourceScanExec或FileScan的示例。
1.对于每个FileSourceScanExec或FileScan节点,提取“位置”元数据,即文件路径。
1.可以使用每个节点的children属性遍历计划树并查找下一个节点。
1.获得所有文件路径后,可以将它们与预定义的路径集进行比较。
下面是实现此方法的代码示例:

def extractFilePaths(node: SparkPlan): List[String] = {
  var filePaths = List[String]()
  node match {
    case f: FileSourceScanExec => filePaths = filePaths :+ 
f.metadata("Location").toString
    case f: FileScan => filePaths = filePaths :+ f.location
     case _ =>
  }
  node.children.flatMap(extractFilePaths) ++ filePaths
}

val filePaths = extractFilePaths(data.queryExecution.executedPlan)

无论Spark计划是否包含连接操作,这都应该有效。

相关问题