为什么SparkQueryPlan在使用缓存(持久化)时会显示更多的分区

4bbkushb  于 2022-12-19  发布在  Apache
关注(0)|答案(2)|浏览(98)

在具有2个核心的单个工作机Spark集群上给出以下PySpark代码:

df = spark.table('table')

df = df.dropDuplicates(['checksum'])

#
df = df.cache()

...

df.write.save('...)

当存在高速缓存时,它生成并执行具有200个分区的计划,而当不存在df.cache()时,它仅生成并执行2个分区的计划。
我特别感兴趣的是了解缓存对这种情况下的计划的影响。
带缓存:

无缓存:

df.cache似乎对AQE(自适应查询执行)有类似的影响,如果DataFrame在代价高昂的shuffle之后被缓存,合并后shuffle分区似乎不会发生。

5us2dqdw

5us2dqdw1#

这是一个AQE的效果。请看第二张图片中的自定义随机播放阅读器节点。
“...它执行CustomShuffleReaderExec而不是ShuffleExchangeExec,并且ShuffledRowRDD是使用基于Map的随机文件执行的,这些文件应该已经存在于本地块管理器(因此是本地随机读取器)中”https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3-local-shuffle-reader/read
200个分区来自configspark.sql.shuffle.partitions的默认值

9jyewag0

9jyewag02#

这是一个非常好的问题!
我挖了,我可以确认,在默认情况下,与AQE的Spark行为就像你描述的。关系是缓存不优化的AQE。
这在本通知单中进行了说明:https://issues.apache.org/jira/browse/SPARK-35332
此票证的结果是此分区已更改,现在我们可以将spark.sql.optimizer.canChangeCachedPlanOutputPartitioning设置为true,以允许AQE在缓存期间更改分区
我使用Spark 3.2在Databricks群集上进行了测试
样本代码:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", false)

val data = Seq(("1", "Frankfurt am main", "Germany"),("1", "Frankfurt am main", "Germany"))
val df = data.toDF("Id", "City", "Country")
val uniqueRecords = df.dropDuplicates("City").cache()
uniqueRecords.show()

使用默认设置,我有这个阶段:

物理计划:

== Physical Plan ==
CollectLimit (9)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- SortAggregate (8)
               +- Sort (7)
                  +- Exchange (6)
                     +- SortAggregate (5)
                        +- * Sort (4)
                           +- * LocalTableScan (3)

如您所见,根本没有AQE
现在让我们尝试

spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", true)

分期:

和物理计划:

== Physical Plan ==
CollectLimit (10)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- AdaptiveSparkPlan (9)
               +- SortAggregate (8)
                  +- Sort (7)
                     +- Exchange (6)
                        +- SortAggregate (5)
                           +- Sort (4)
                              +- LocalTableScan (3)

因此,我可以确认,当你改变这个参数的Spark3.2和以上。AQE是可见的计划和分区合并

相关问题