我有一个文本文件,由大量由空格分隔的随机浮动值组成。我正在将这个文件加载到scala中的rdd中。这个rdd是如何分区的?
另外,是否有任何方法来生成自定义分区,以便所有分区都具有相同数量的元素以及每个分区的索引?
val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))
这里我从hdfs加载多个文本文件,process是我调用的一个函数。我能用mappartionswithindex解决这个问题吗?我怎样才能在process函数中访问这个索引?Map将分区无序排列。
3条答案
按热度按时间3bygqnnd1#
可以使用coalesce函数生成自定义分区:
vcirk6k62#
加载的rdd由默认的partitioner:hash代码进行分区。要指定自定义分区器,请使用您自己的分区器提供的can check rdd.partitionby()。
我不认为在这里使用coalesce()是可以的,就像api文档一样,coalesce()只能在我们减少分区数量时使用,甚至我们不能用coalesce()指定自定义分区器。
e7arh2l63#
rdd是如何分区的?
默认情况下,为每个hdfs分区创建一个分区,默认情况下为64mb。在这里阅读更多。
如何在分区间平衡数据?
首先,看一下重新划分数据的三种方法:
1) 将第二个参数(rdd所需的最小分区数)传递到textfile()中,但要小心:
如你所见,
[16]
没有达到预期的效果,因为rdd的分区数已经大于我们请求的最小分区数。2) 使用repartition(),如下所示:
警告:这将调用shuffle,当您想增加rdd的分区数时应该使用它。
从文档中:
shuffle是spark重新分发数据的机制,以便在分区之间对数据进行不同的分组。这通常涉及到跨执行器和机器复制数据,使洗牌成为一个复杂而昂贵的操作。
3) 使用coalesce(),如下所示:
在这里,spark知道您将缩小rdd并从中获益。阅读有关repartition()和coalesce()的更多信息。
但所有这些都能保证您的数据在分区间得到完美的平衡吗?不是真的,正如我在如何跨分区平衡数据方面所经历的那样?