如何划分rdd

bfrts1fy  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(441)

我有一个文本文件,由大量由空格分隔的随机浮动值组成。我正在将这个文件加载到scala中的rdd中。这个rdd是如何分区的?
另外,是否有任何方法来生成自定义分区,以便所有分区都具有相同数量的元素以及每个分区的索引?

val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))

这里我从hdfs加载多个文本文件,process是我调用的一个函数。我能用mappartionswithindex解决这个问题吗?我怎样才能在process函数中访问这个索引?Map将分区无序排列。

3bygqnnd

3bygqnnd1#

可以使用coalesce函数生成自定义分区:

coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
vcirk6k6

vcirk6k62#

加载的rdd由默认的partitioner:hash代码进行分区。要指定自定义分区器,请使用您自己的分区器提供的can check rdd.partitionby()。
我不认为在这里使用coalesce()是可以的,就像api文档一样,coalesce()只能在我们减少分区数量时使用,甚至我们不能用coalesce()指定自定义分区器。

e7arh2l6

e7arh2l63#

rdd是如何分区的?
默认情况下,为每个hdfs分区创建一个分区,默认情况下为64mb。在这里阅读更多。
如何在分区间平衡数据?
首先,看一下重新划分数据的三种方法:
1) 将第二个参数(rdd所需的最小分区数)传递到textfile()中,但要小心:

In [14]: lines = sc.textFile("data")

In [15]: lines.getNumPartitions()
Out[15]: 1000

In [16]: lines = sc.textFile("data", 500)

In [17]: lines.getNumPartitions()
Out[17]: 1434

In [18]: lines = sc.textFile("data", 5000)

In [19]: lines.getNumPartitions()
Out[19]: 5926

如你所见, [16] 没有达到预期的效果,因为rdd的分区数已经大于我们请求的最小分区数。
2) 使用repartition(),如下所示:

In [22]: lines = lines.repartition(10)

In [23]: lines.getNumPartitions()
Out[23]: 10

警告:这将调用shuffle,当您想增加rdd的分区数时应该使用它。
从文档中:
shuffle是spark重新分发数据的机制,以便在分区之间对数据进行不同的分组。这通常涉及到跨执行器和机器复制数据,使洗牌成为一个复杂而昂贵的操作。
3) 使用coalesce(),如下所示:

In [25]: lines = lines.coalesce(2)

In [26]: lines.getNumPartitions()
Out[26]: 2

在这里,spark知道您将缩小rdd并从中获益。阅读有关repartition()和coalesce()的更多信息。
但所有这些都能保证您的数据在分区间得到完美的平衡吗?不是真的,正如我在如何跨分区平衡数据方面所经历的那样?

相关问题