被Spark扭曲

sqyvllje  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(221)

我有一个数据集,我想用一个特定的键(clientid)对它进行分区,但是有些客户机产生的数据远远多于其他客户机。hive中有一个特性叫做“listbucketing”,由“skewed by”调用,专门用来处理这种情况。
但是,我找不到任何迹象表明spark支持这个特性,或者如何(如果它支持的话)使用它。
是否有与之等效的Spark特性?或者,spark是否有其他一些可以复制这种行为的特性?
(作为我实际用例的一个额外要求,您的建议方法是否适用于amazon athena?)

t40tm48m

t40tm48m1#

据我所知,spark中没有这种现成的工具。在数据倾斜的情况下,最常见的是添加一个人工列来进一步压缩数据。
假设您希望按列“y”进行分区,但是数据非常倾斜,就像这个玩具示例(1个分区有5行,其他分区只有一行):

val df = spark.range(8).withColumn("y", when('id < 5, 0).otherwise('id))
df.show()
+---+---+
| id|  y|
+---+---+
|  0|  0|
|  1|  0|
|  2|  0|
|  3|  0|
|  4|  0|
|  5|  5|
|  6|  6|
|  7|  7|
+-------+

现在让我们添加一个人造的随机列并编写Dataframe。

val maxNbOfBuckets = 3
val part_df = df.withColumn("r", floor(rand() * nbOfBuckets))
part_df.show
+---+---+---+
| id|  y|  r|
+---+---+---+
|  0|  0|  2|
|  1|  0|  2|
|  2|  0|  0|
|  3|  0|  0|
|  4|  0|  1|
|  5|  5|  2|
|  6|  6|  2|
|  7|  7|  1|
+---+---+---+

// and writing. We divided the partition with 5 elements into 3 partitions.
part_df.write.partitionBy("y", "r").csv("...")

相关问题