我遇到了一个spark作业的困难,大约有一半的时间,它会选择在一个节点上处理所有数据,然后耗尽内存并死掉。
问题:我如何确保这种情况不会发生?
该系统在yarn上使用spark1.6.0,来自hadoop2.6数据存储,所有代码都用java编写。我在一个有十几个ish节点的集群中动态分配资源(amazon)。
dag相对简单:
RDD --> mapToPair \
coGroup --> flatMapToPair --> reduceByKey --> save
RDD --> mapToPair /
当它正确运行时,所有任务都会在集群中得到很好的分配,整个作业大约需要20分钟。我们称之为“良好行为”。然而,有时flatmaptopair阶段在单个执行器中有效地运行。我们称之为“不良行为”
当我为一个“坏行为”作业加载spark ui并深入到flatmaptopair阶段时,我发现实际上,每个节点上都有大约3-4个执行器(与“好行为”情况相同)。但是,除了一个执行器外,其余的执行器都会在一秒钟内完成,剩下的执行器会运行10分钟,然后因为超出内存限制而被Yarn杀死。
我已经尝试过的事情:
网络。搜索“spark run on one node”之类的内容和变体几乎普遍会导致spark shell中以本地模式运行的用户或类似的配置问题。考虑到我至少在某些时候有良好的行为,这种配置问题似乎不太可能发生(而且我已经检查过,我不是偶然地处于本地模式,我有~100个分区,…)。
在同一集群上运行的其他spark作业表现良好。这似乎排除了一些集群范围内的错误配置(见鬼,即使这个作业有时运行得很好)。
集群利用率似乎并不影响我得到的是好的行为还是坏的行为。我看到了这两种行为,一种是当集群被大量利用时,另一种是当集群没有任何其他运行时。
这看起来不像是一个棘手的问题,因为执行者都在集群中得到了很好的分布。当然,我可能错了,但问题似乎真的是执行者之间的工作分配。
数据集中有多个键。我在cogroup和flatmaptopair之间插入了一个countbykey并打印了结果(对于20个左右数量最多的键)。数据相当均匀地分布在这些顶键之间。
我尝试过的回应评论的东西
在flatmaptopair调用强制500个分区之前重新分区rdd。这只会将不良行为转移到重新分区阶段。
增加默认并行度。我确实用这种方式得到了更多的分区,但这种不良行为仍然停留在flatmaptopair阶段。
去掉数据(实际上我在发布之前做了很多,但是没有把它包含在原来的列表中)。我们只有几个10的gb,我已经加载了我需要的最低限度的数据。
这是一个“有趣的”小heisenbug,在添加调试日志记录后,坏行为消失了,在删除日志记录后又消失了,只是在一段时间后再次出现。我没有主意了,所以如果有人有什么建议的诊断步骤,我洗耳恭听。
1条答案
按热度按时间ahy6op9u1#
我遇到了一些非常类似的问题,虽然我对这个解决方案并不完全满意,因为我不能很好地解释为什么它会起作用,但它似乎确实起作用。在我的例子中,它是在一次洗牌之后,洗牌数据的大小非常小。问题是,随后的计算大大增加了数据的大小,以至于在1或2个执行器上进行这些计算成为瓶颈。我最好的猜测是,它与一个启发式算法有关,涉及数据源的首选位置和目标分区大小,可能与不知道后期发生的扩展相结合。
通过添加一个
coalesce(totalCores)
,在哪里totalCores
定义为spark.executor.instances
十spark.executor.cores
. 它似乎也适用于更大倍数的totalCores
,但就我而言,我不需要更多的平行性。注意,可能需要使用repartition
而不是coalesce
取决于用例。另外,这是在spark 2.2.1上,供参考。