在具有2个核心的单个工作机Spark集群上给出以下PySpark代码:
df = spark.table('table')
df = df.dropDuplicates(['checksum'])
#
df = df.cache()
...
df.write.save('...)
当存在高速缓存时,它生成并执行具有200个分区的计划,而当不存在df.cache()
时,它仅生成并执行2个分区的计划。
我特别感兴趣的是了解缓存对这种情况下的计划的影响。
带缓存:
无缓存:
df.cache
似乎对AQE(自适应查询执行)有类似的影响,如果DataFrame在代价高昂的shuffle之后被缓存,合并后shuffle分区似乎不会发生。
2条答案
按热度按时间5us2dqdw1#
这是一个AQE的效果。请看第二张图片中的自定义随机播放阅读器节点。
“...它执行CustomShuffleReaderExec而不是ShuffleExchangeExec,并且ShuffledRowRDD是使用基于Map的随机文件执行的,这些文件应该已经存在于本地块管理器(因此是本地随机读取器)中”https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3-local-shuffle-reader/read
200个分区来自configspark.sql.shuffle.partitions的默认值
9jyewag02#
这是一个非常好的问题!
我挖了,我可以确认,在默认情况下,与AQE的Spark行为就像你描述的。关系是缓存不优化的AQE。
这在本通知单中进行了说明:https://issues.apache.org/jira/browse/SPARK-35332
此票证的结果是此分区已更改,现在我们可以将spark.sql.optimizer.canChangeCachedPlanOutputPartitioning设置为true,以允许AQE在缓存期间更改分区
我使用Spark 3.2在Databricks群集上进行了测试
样本代码:
使用默认设置,我有这个阶段:
物理计划:
如您所见,根本没有AQE
现在让我们尝试
分期:
和物理计划:
因此,我可以确认,当你改变这个参数的Spark3.2和以上。AQE是可见的计划和分区合并