我正在检查两个相同sparkDataframe的并集的分区数,我注意到scala和pyhtonapi之间的结果不一样。
对于python,联合的分区数是2个Dataframe的分区数之和,这是预期的行为。
python
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
结果:
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
但是,使用scala,联合的分区数不会改变。
斯卡拉
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
结果:
df1 partitions: 10
df2 partitions: 10
df3 partitions: 10
只有当两个Dataframe以相同的方式构建时,才会出现这种情况。
何时不是:
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
我得到了预期的结果(总和):
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
我的理解是,使用scalaapi,spark在某些情况下能够优化联合。这是真的吗?这意味着在scala和pythonapi之间,联合的执行计划可能不同?
我问这个问题是因为我注意到scala的联合比python的联合性能更好,特别是在多重联合的情况下。
3条答案
按热度按时间ui7jx7zq1#
线索是通过scala引擎的解释:
重用交换是一种优化形式。catalyst发现它们是相同的。
如果一个有10000个条目,另一个有10001个条目,那么得到20个分区。斯帕克有些聪明。
3pmvbmvn2#
如果我发现有什么有趣的事会不断更新
观察1——物理计划在scala和python之间有区别
观察结果2——spark中的联合在很大程度上不会引起洗牌操作,这是一个非常有效的操作。我相信是对df1和df2的显式重新分区导致了联合的df3的分区数发生变化。如果你没有明确划分你的输入
Dataframes
最后得到一个并集df,其分区号等于df1和df2之和。我试着在同一个数据上进行排列,得到了满意的结果案例1
o/p公司*
案例2
o/p公司*
案例3
o/p公司
56lgkhnf3#
spark-scala中并的定义
pyspark中并的定义
请参阅此处的代码以了解区别https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/optimizer.scala
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/dataset.scala
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py