Apache Spark 运行大型数据集导致超时

iyfjxgzm  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(198)

我正在构建一个相对简单的Spark应用程序,其逻辑大致如下:

val file1 = sc.textFile("s3://file1/*")
val file2 = sc.textFile("s3://file2/*")
// map over files
val file1Map = file1.map(word => (word, "val1"))
val file2Map = file2.map(differentword => (differentword, "val2"))
val unionRdd = file1Map.union(file2Map)
val groupedUnion = unionRdd.groupByKey()
val output = groupedUnion.map(tuple => {
    // do something that requires all the values, return new object
    if(oneThingIsTrue) tuple._1 else "null"
}).filter(line => line != "null")
output.saveAsTextFile("s3://newfile/")

这个问题与我在运行较大的数据集时不工作有关。当数据集大约为700GB时,我可以运行它而不出错。当我将它加倍到1.6TB时,作业将在超时前完成一半。下面是错误日志:

INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@172.31.4.36:39743)
ERROR MapOutputTrackerWorker: Error communicating with MapOutputTracker
    org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [800 seconds]. This timeout is controlled by spark.network.timeout

我试过将网络超时时间增加到800秒和1600秒,但这只是延长了错误时间。我在10r4.2xl上运行代码,每个内核有8个内核,内存为62GB。我将EBS设置为3TB存储空间。我在Amazon EMR中通过Zeppelin运行此代码。
有人能帮我调试一下吗?集群的CPU使用率一直接近90%,直到它达到一半,然后完全降回0。另一个有趣的事情是,它看起来像是在第二阶段洗牌时失败了。正如您从跟踪中看到的,它正在执行获取操作,但从未获得它。
这是一张来自Ganglia的照片。

xuo3flqw

xuo3flqw1#

我仍然不确定是什么导致了这个问题,但是我可以通过合并unionRdd并对结果进行分组来解决这个问题。

...
// union rdd is 30k partitions, coalesce into 8k
val unionRdd = file1Map.union(file2Map)
val col = unionRdd.coalesce(8000) 
val groupedUnion = col.groupByKey()
...

这可能不是有效的,但它的工作。

izj3ouym

izj3ouym2#

请将groupbykey替换为reduceByKey、聚合ByKey或组合ByKey。
groupByKey必须将所有类似的键放到同一个工作线程上,这可能会导致内存不足错误。不确定为什么使用此函数时没有警告

相关问题