编辑:答案很有帮助,但我在:spark中的memoryoverhead问题中描述了我的解决方案。
我有一个202092分区的rdd,它读取其他人创建的数据集。我可以手动看到分区之间的数据不平衡,例如,其中一些分区有0个图像,其他分区有4k,而平均值为432。在处理数据时,出现以下错误:
Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
而记忆头已经被提升了。我觉得有些尖刺正在发生,使Yarn杀死我的容器,因为尖刺溢出指定的边界。
那么我应该怎么做才能确保我的数据在分区之间(大致)平衡呢?
我的想法是repartition()可以工作,它调用洗牌:
dataset = dataset.repartition(202092)
但我还是犯了同样的错误,尽管编程指南上有说明:
重新分区(numpartitions)
随机地重新排列rdd中的数据,创建更多或更少的分区,并在它们之间保持平衡。这总是在网络上洗牌所有数据。
看看我的玩具例子:
data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000))
d = data.glom().collect()
len(d[0]) # 1000
len(d[1]) # 2000
len(d[2]) # 3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 1854
len(re_d[1]) # 1754
len(re_d[2]) # 2392
repartitioned_data = data.repartition(6)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 422
len(re_d[1]) # 845
len(re_d[2]) # 1643
len(re_d[3]) # 1332
len(re_d[4]) # 1547
len(re_d[5]) # 211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 132
len(re_d[1]) # 265
len(re_d[2]) # 530
len(re_d[3]) # 1060
len(re_d[4]) # 1025
len(re_d[5]) # 145
len(re_d[6]) # 290
len(re_d[7]) # 580
len(re_d[8]) # 1113
len(re_d[9]) # 272
len(re_d[10]) # 522
len(re_d[11]) # 66
1条答案
按热度按时间66bbxpm51#
内存开销限制超过这个问题,我认为是由于在获取过程中使用了directmemory缓冲区。我想它是在2.0.0中修复的(我们也遇到了同样的问题,但是当我们发现升级到2.0.0解决了这个问题时,我们就不再深入挖掘了。不幸的是,我没有spark发行号来支持我。)
不平整的隔墙
repartition
令人惊讶。对比https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/rdd.scala#l443. spark甚至会在中生成随机键repartition
,因此不能使用可能有偏差的散列。我尝试了你的例子,得到了与spark 1.6.2和spark 2.0.0完全相同的结果。但不是来自斯卡拉
spark-shell
:好漂亮的隔板!
(抱歉,这不是完整答案。我只是想分享我目前的发现。)