超出spark内存限制问题

gzszwxb4  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(573)

我有一个在spark上运行的作业,它是用spark rdd用scala im编写的。由于group by操作成本高昂,我得到以下错误: Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead . 我提高了记忆力,但我得到了同样的结果。我使用10台r4.xlarge机器。我试过使用r4.2xlarge,甚至r4.4xlarge,但也有相同的错误。这些数据是在5gb的gzip数据上进行im测试的(近50个解压缩的数据和近600万条记录)。
一些配置: spark.executor.memory :20480米 spark.driver.memory :21295米 spark.yarn.executor.memoryOverhead :2克 spark.executor.instances : 10
代码如下:

val groupedEntitiesRDD = datasetRDD 
  .groupBy(_.entityId) 
  .map({ case (key, valueIterator) => key -> valueIterator.toList }) 
  .persist(StorageLevel.MEMORY_AND_DISK) 

val deduplicatedRDD = groupedEntitiesRDD 
  .flatMap({ case (_, entities) => deduplication(entities) }) 

def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = { 
  entities 
    .groupBy(_.deduplicationKey) 
    .values 
    .map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond)) 
    .toList 
}
oaxa6hgo

oaxa6hgo1#

根据我的经验和我在spark2.x发行说明中读到的内容,需要分配更多的堆外内存( spark.yarn.executor.memoryOverhead )而不是spark 1.x。
您只分配了2g内存和20gb内存。我相信你会得到更好的结果,如果你把它改为8g内存和14gb执行器内存。
如果您仍然遇到内存问题(如实际抛出的oom),您将需要查看数据倾斜。尤其是 groupBy 操作经常会导致严重的数据倾斜。
最后一件事,你写你用的 RDDs -我希望你是说 DataFrames 或者 DataSets ? RDDs 性能非常低 groupBy (例如,请参阅这篇博文了解原因)如果你是 RDDs 你应该使用 reduceByKey 相反。但实际上应该使用Dataframe(或数据集),如果 groupBy 确实是正确的选择。
编辑!
你在评论中问过如何转换 groupByreduceByKey . 你可以这样做:

datasetRDD
  .map{case(entityID, streamObject) => (entityID, List(streamObject))}
  .reduceByKey(_++_)
  .flatMap{case(_, entities) => deduplication(entities)

您尚未指定这些实体的数据结构,但看起来您正在寻找某个最大值,实际上是在丢弃不需要的数据。这应该是建立在 reduceByKey -操作,这样您可以过滤掉不必要的数据,同时减少。

相关问题