sparksql shuffle分区中的差异

w9apscun  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(473)

我试着理解sparksqlshuffle分区,默认设置为200。数据如下所示,后面是为这两种情况创建的分区数。

scala> flightData2015.show(3)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+

scala> println(flightData2015.sort("DEST_COUNTRY_NAME").rdd.getNumPartitions)
104

scala> println(flightData2015.groupBy("DEST_COUNTRY_NAME").count().rdd.getNumPartitions)
200

这两种情况都会导致一个洗牌阶段,该阶段将产生200个分区(默认值)。有人能解释为什么有区别吗?

bksxznpy

bksxznpy1#

根据spark文档,有两种重新划分数据的方法。一种是通过这种配置 spark.sql.shuffle.partitions 默认值为200,在运行任何连接或聚合时始终应用,如这里所示。
当我们谈论 sort() 这并不是那么简单,spark使用一个计划器来确定数据集中的数据有多倾斜。如果不是太歪斜的话,就用 sort-merge join 这将导致200个分区,正如您所期望的,它更喜欢做一个 broadcast 数据在分区之间的移动,避免了完全的无序移动。这样可以节省排序期间的时间,以减少网络通信量更多详细信息请参见此处。

zlhcx6iw

zlhcx6iw2#

这两种情况的区别在于 sort 以及 groupBy 在引擎盖下使用不同的分隔器。 groupBy -正在使用 hashPartitioning 这意味着它计算密钥的散列,然后计算 pmod 设置为200(或者任意设置为随机分区的数量),因此它将始终创建200个分区(即使其中一些分区可能为空) sort / orderBy -正在使用 rangePartitioning 这意味着它运行一个单独的作业来对数据进行采样,并在此基础上为分区创建边界(尝试使其为200)。现在,基于采样数据分布和实际行数,它可能会创建小于200的边界,这就是为什么只有104行的原因。

相关问题