我有一个在spark上运行的作业,它是用spark rdd用scala im编写的。由于group by操作成本高昂,我得到以下错误: Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
. 我提高了记忆力,但我得到了同样的结果。我使用10台r4.xlarge机器。我试过使用r4.2xlarge,甚至r4.4xlarge,但也有相同的错误。这些数据是在5gb的gzip数据上进行im测试的(近50个解压缩的数据和近600万条记录)。
一些配置: spark.executor.memory
:20480米 spark.driver.memory
:21295米 spark.yarn.executor.memoryOverhead
:2克 spark.executor.instances
: 10
代码如下:
val groupedEntitiesRDD = datasetRDD
.groupBy(_.entityId)
.map({ case (key, valueIterator) => key -> valueIterator.toList })
.persist(StorageLevel.MEMORY_AND_DISK)
val deduplicatedRDD = groupedEntitiesRDD
.flatMap({ case (_, entities) => deduplication(entities) })
def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = {
entities
.groupBy(_.deduplicationKey)
.values
.map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond))
.toList
}
1条答案
按热度按时间oaxa6hgo1#
根据我的经验和我在spark2.x发行说明中读到的内容,需要分配更多的堆外内存(
spark.yarn.executor.memoryOverhead
)而不是spark 1.x。您只分配了2g内存和20gb内存。我相信你会得到更好的结果,如果你把它改为8g内存和14gb执行器内存。
如果您仍然遇到内存问题(如实际抛出的oom),您将需要查看数据倾斜。尤其是
groupBy
操作经常会导致严重的数据倾斜。最后一件事,你写你用的
RDDs
-我希望你是说DataFrames
或者DataSets
?RDDs
性能非常低groupBy
(例如,请参阅这篇博文了解原因)如果你是RDDs
你应该使用reduceByKey
相反。但实际上应该使用Dataframe(或数据集),如果groupBy
确实是正确的选择。编辑!
你在评论中问过如何转换
groupBy
至reduceByKey
. 你可以这样做:您尚未指定这些实体的数据结构,但看起来您正在寻找某个最大值,实际上是在丢弃不需要的数据。这应该是建立在
reduceByKey
-操作,这样您可以过滤掉不必要的数据,同时减少。