我试着理解sparksqlshuffle分区,默认设置为200。数据如下所示,后面是为这两种情况创建的分区数。
scala> flightData2015.show(3)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 15|
| United States| Croatia| 1|
| United States| Ireland| 344|
+-----------------+-------------------+-----+
scala> println(flightData2015.sort("DEST_COUNTRY_NAME").rdd.getNumPartitions)
104
scala> println(flightData2015.groupBy("DEST_COUNTRY_NAME").count().rdd.getNumPartitions)
200
这两种情况都会导致一个洗牌阶段,该阶段将产生200个分区(默认值)。有人能解释为什么有区别吗?
2条答案
按热度按时间bksxznpy1#
根据spark文档,有两种重新划分数据的方法。一种是通过这种配置
spark.sql.shuffle.partitions
默认值为200,在运行任何连接或聚合时始终应用,如这里所示。当我们谈论
sort()
这并不是那么简单,spark使用一个计划器来确定数据集中的数据有多倾斜。如果不是太歪斜的话,就用sort-merge join
这将导致200个分区,正如您所期望的,它更喜欢做一个broadcast
数据在分区之间的移动,避免了完全的无序移动。这样可以节省排序期间的时间,以减少网络通信量更多详细信息请参见此处。zlhcx6iw2#
这两种情况的区别在于
sort
以及groupBy
在引擎盖下使用不同的分隔器。groupBy
-正在使用hashPartitioning
这意味着它计算密钥的散列,然后计算pmod
设置为200(或者任意设置为随机分区的数量),因此它将始终创建200个分区(即使其中一些分区可能为空)sort
/orderBy
-正在使用rangePartitioning
这意味着它运行一个单独的作业来对数据进行采样,并在此基础上为分区创建边界(尝试使其为200)。现在,基于采样数据分布和实际行数,它可能会创建小于200的边界,这就是为什么只有104行的原因。