我想获得一个spark计划引用的所有文件路径,这样我就可以将它们与一组预定义的路径进行比较。
在Spark计划上进行简单的字符串搜索可能会遇到问题,因为我发现计划中的路径被截断了。(配置Spark.sql.最大元数据字符串长度和Spark.debug.最大到字符串字段,我还尝试使用.treeString(verbose=true))。('formated ')给出了未截断的输出。但我希望从org.apache.spark.sql.execution.SparkPlan获取它。
我也试过从Spark Plan中提取FileScans,这个article对以下代码有帮助:
def full_file_meta(f: FileSourceScanExec) = {
val metadataEntries = f.metadata.toSeq.sorted.flatMap {
case (key, value) if Set(
"Location", "PartitionCount",
"PartitionFilters", "PushedFilters"
).contains(key) =>
Some(key + ": " + value.toString)
case other => None
}
val metadataStr = metadataEntries.mkString("[\n ", ",\n ", "\n]")
s"${f.nodeNamePrefix}${f.nodeName}$metadataStr"
}
val ep = data.queryExecution.executedPlan
print(ep.flatMap {
case f: FileSourceScanExec => full_file_meta(f)::Nil
case other => Nil
}.mkString(",\n"))
然而,这在简单计划的情况下是有帮助的(例如“select * from path”)。当连接出现在图片中时,它不起作用(查询,例如“select * from path1 join path2”)。在后一种情况下,由于某种原因,没有FileSourceScanExec对象作为FlatMap的键存在。实际上,只有一个键,即org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec,并且它有一个空的子列表。
1条答案
按热度按时间lx0bsm1f1#
要从Spark计划中提取文件路径,可以尝试以下方法:
1.遍历Spark计划树并在节点中查找FileSourceScanExec或FileScan的示例。
1.对于每个FileSourceScanExec或FileScan节点,提取“位置”元数据,即文件路径。
1.可以使用每个节点的children属性遍历计划树并查找下一个节点。
1.获得所有文件路径后,可以将它们与预定义的路径集进行比较。
下面是实现此方法的代码示例:
无论Spark计划是否包含连接操作,这都应该有效。