我有一个Dataframe df
在spark中,它看起来像这样:
val df = (1 to 10).toList.toDF()
当我检查分区数时,我看到我有10个分区:
df.rdd.getNumPartitions
res0: Int = 10
现在我生成一个id列:
val dfWithID = df.withColumn("id", monotonically_increasing_id())
dfWithID.show()
+-----+---+
|value| id|
+-----+---+
| 1| 0|
| 2| 1|
| 3| 2|
| 4| 3|
| 5| 4|
| 6| 5|
| 7| 6|
| 8| 7|
| 9| 8|
| 10| 9|
+-----+---+
所以所有生成的id都是连续的,尽管我有10个分区。然后我重新划分Dataframe:
val dfp = df.repartition(10)
val dfpWithID = dfp.withColumn("id", monotonically_increasing_id())
dfpWithID.show()
+-----+-----------+
|value| id|
+-----+-----------+
| 10| 0|
| 1| 8589934592|
| 7|17179869184|
| 5|25769803776|
| 4|42949672960|
| 9|42949672961|
| 2|51539607552|
| 8|60129542144|
| 6|68719476736|
| 3|77309411328|
+-----+-----------+
现在我得到的ID不再是连续的了。基于spark文档,它应该将分区id放在上面的31位,在这两种情况下我都有10个分区。为什么它只在调用 repartition()
?
1条答案
按热度按时间bqf10yzr1#
我假设这是因为初始Dataframe中的所有数据都驻留在一个分区中,其他9个分区为空。
为此,请使用这里给出的答案:apachespark:get number of records per partition