sortwithinpartitions如何排序?

nqwrtyyt  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(1191)

在对df应用sortwithinpartitions并将输出写入表之后,我得到了一个不确定如何解释的结果。

  1. df
  2. .select($"type", $"id", $"time")
  3. .sortWithinPartitions($"type", $"id", $"time")

结果文件看起来有点像

  1. 1 a 5
  2. 2 b 1
  3. 1 a 6
  4. 2 b 2
  5. 1 a 7
  6. 2 b 3
  7. 1 a 8
  8. 2 b 4

它实际上不是随机的,但也不是像我期望的那样排序的。也就是说,先按类型,然后按id,然后按时间。如果我在排序之前尝试使用重新分区,那么我会得到想要的结果。但由于某些原因,这些文件的重量是原来的5倍(100gb比20gb)。
我正在给一个压缩设置为snappy的hive orc表写信。
有人知道为什么它是这样排序的吗?为什么重新分区的顺序正确,但是大小更大?
使用spark 2.2。

rxztt3cl

rxztt3cl1#

sortwithinpartition状态的文档
返回一个新的数据集,每个分区按给定的表达式排序
考虑这个函数最简单的方法是设想第四列(分区id)作为主要排序标准。函数spark\u partition\u id()打印分区。
例如,如果您只有一个大分区(作为spark用户,您永远不会这么做!), sortWithinPartition 正常排序:

  1. df.repartition(1)
  2. .sortWithinPartitions("type","id","time")
  3. .withColumn("partition", spark_partition_id())
  4. .show();

印刷品

  1. +----+---+----+---------+
  2. |type| id|time|partition|
  3. +----+---+----+---------+
  4. | 1| a| 5| 0|
  5. | 1| a| 6| 0|
  6. | 1| a| 7| 0|
  7. | 1| a| 8| 0|
  8. | 2| b| 1| 0|
  9. | 2| b| 2| 0|
  10. | 2| b| 3| 0|
  11. | 2| b| 4| 0|
  12. +----+---+----+---------+

如果有更多分区,则结果仅在每个分区内排序:

  1. df.repartition(4)
  2. .sortWithinPartitions("type","id","time")
  3. .withColumn("partition", spark_partition_id())
  4. .show();

印刷品

  1. +----+---+----+---------+
  2. |type| id|time|partition|
  3. +----+---+----+---------+
  4. | 2| b| 1| 0|
  5. | 2| b| 3| 0|
  6. | 1| a| 5| 1|
  7. | 1| a| 6| 1|
  8. | 1| a| 8| 2|
  9. | 2| b| 2| 2|
  10. | 1| a| 7| 3|
  11. | 2| b| 4| 3|
  12. +----+---+----+---------+

为什么要用 sortWithPartition 而不是排序? sortWithPartition 不会触发洗牌,因为数据只在执行器中移动。 sort 但是会触发一次洗牌。因此 sortWithPartition 执行速度更快。如果数据被一个有意义的列分区,那么在每个分区内排序就足够了。

展开查看全部

相关问题