使用scala和pythonapi联合sparkDataframe时的不同分区号

gg0vcinb  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(403)

我正在检查两个相同sparkDataframe的并集的分区数,我注意到scala和pyhtonapi之间的结果不一样。
对于python,联合的分区数是2个Dataframe的分区数之和,这是预期的行为。
python

from pyspark.sql.types import IntegerType

df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())

df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())

df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())

结果:

df1 partitions: 10
df2 partitions: 10
df3 partitions: 20

但是,使用scala,联合的分区数不会改变。
斯卡拉

val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")

val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")

val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")

结果:

df1 partitions: 10
df2 partitions: 10
df3 partitions: 10

只有当两个Dataframe以相同的方式构建时,才会出现这种情况。
何时不是:

val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")

val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")

val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")

我得到了预期的结果(总和):

df1 partitions: 10
df2 partitions: 10
df3 partitions: 20

我的理解是,使用scalaapi,spark在某些情况下能够优化联合。这是真的吗?这意味着在scala和pythonapi之间,联合的执行计划可能不同?
我问这个问题是因为我注意到scala的联合比python的联合性能更好,特别是在多重联合的情况下。

ui7jx7zq

ui7jx7zq1#

线索是通过scala引擎的解释:

Union
:- Exchange RoundRobinPartitioning(10), [id=#757]
:  +- LocalTableScan [value#154]
+- ReusedExchange [value#159], Exchange RoundRobinPartitioning(10), [id=#757]

重用交换是一种优化形式。catalyst发现它们是相同的。
如果一个有10000个条目,另一个有10001个条目,那么得到20个分区。斯帕克有些聪明。

3pmvbmvn

3pmvbmvn2#

如果我发现有什么有趣的事会不断更新
观察1——物理计划在scala和python之间有区别

union physical plan pyspark 
:- Exchange RoundRobinPartitioning(10), [id=#1318]
:  +- *(1) Scan ExistingRDD[value#148]
+- Exchange RoundRobinPartitioning(10), [id=#1320]
   +- *(2) Scan ExistingRDD[value#154]

== Physical Plan scala  ==
Union
:- Exchange RoundRobinPartitioning(10), [id=#1012]
:  +- LocalTableScan [value#122]
+- ReusedExchange [value#131], Exchange RoundRobinPartitioning(10), [id=#1012]

 scala  Range (1 to 10 by 2) == Physical Plan ==
 val df2 = (1 to 10 by 2).toDF.repartition(10)
Union
:- Exchange RoundRobinPartitioning(10), [id=#1644]
:  +- LocalTableScan [value#184]
+- Exchange RoundRobinPartitioning(10), [id=#1646]
   +- LocalTableScan [value#193]

观察结果2——spark中的联合在很大程度上不会引起洗牌操作,这是一个非常有效的操作。我相信是对df1和df2的显式重新分区导致了联合的df3的分区数发生变化。如果你没有明确划分你的输入 Dataframes 最后得到一个并集df,其分区号等于df1和df2之和。我试着在同一个数据上进行排列,得到了满意的结果
案例1

from pyspark.sql.types import IntegerType
  df1 = spark.createDataFrame(range(100000), IntegerType())
  print("df1 partitions: %d" %df1.rdd.getNumPartitions())
  print("df1 partitioner: %s" %df1.rdd.partitioner)
  df2 = spark.createDataFrame(range(100000), IntegerType())
  print("df2 partitions: %d" %df2.rdd.getNumPartitions())
  print("df2 partitioner: %s" %df2.rdd.partitioner)
  df3 = df1.union(df2)
  print("df3 partitions: %d" %df3.rdd.getNumPartitions())
  print("df3 partitioner: %s" %df3.rdd.partitioner)

o/p公司*

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None

df3 partitions: 16
df3 partitioner: None

案例2

val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000).toDF

println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
 df1.union(df2).explain()
val df3 = df1.union(df2)

println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")

o/p公司*

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None

案例3

val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000 by 2).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")

o/p公司

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
56lgkhnf

56lgkhnf3#

spark-scala中并的定义

def union(other: Dataset[T]): Dataset[T] = withSetOperator {
    // This breaks caching, but it's usually ok because it addresses a very specific use case:
    // using union to union many files or partitions.
    CombineUnions(Union(logicalPlan, other.logicalPlan))
  }

pyspark中并的定义

def union(self, other):
        # Return a new :class:`DataFrame` containing union of rows in this and #another
        #:class:`DataFrame`.
        #This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
        #(that does deduplication of elements), use this function followed by #:func:`distinct`.
        #Also as standard in SQL, this function resolves columns by position (not #by name).

        return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)

请参阅此处的代码以了解区别https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/optimizer.scala
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/dataset.scala
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py

相关问题