Apache Spark EMR中基于AWS Graviton的EC2示例升级导致任务失败

zbdgwd5y  于 2023-04-12  发布在  Apache
关注(0)|答案(1)|浏览(123)

我有一个在EMR中运行的Spark Scala作业,我正在努力改进。截至目前,它在m5.8xlarge上运行,没有任何问题。我最近尝试升级到基于Graviton的EC2示例m6g.8xlarge,虽然作业成功,但我看到一些奇怪的问题。我看到的一些问题是由于超时导致任务失败,阶段以奇怪的顺序运行,并且看起来内存紧张。运行无序的阶段是任务失败的阶段,阶段6运行然后失败,然后阶段4&5完成,然后阶段6重试成功。在当前正在工作的m5.8xlarge运行中,阶段4和5被跳过。我不知道为什么会发生这种情况,因为我所做的唯一更改是从m5示例类型到m6g,所以我想看看是否有人遇到类似的问题或有解决方案。我还将发布一些失败任务的错误,但我认为它们与oom有关。
以下是我看到的主要错误:

ERROR TransportClientFactory:261 - Exception while bootstrapping client after 60041 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
    at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
    at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:263)
    at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
    at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:116)
    at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:89)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:257)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:109)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:264)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:614)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:609)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:442)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:160)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:66)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:173)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
    at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
    at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:259)
    ... 39 more
ikfrs5lh

ikfrs5lh1#

我认为这不是内存不足的问题。m6g.8xlarge和m5.8xlarge都有120 GB的内存,符合其规格:https://aws.amazon.com/ec2/instance-types/m6ghttps://aws.amazon.com/ec2/instance-types/m5
我在回溯中看到超时是在身份验证过程中:
首先,它无法在doBootstrap(AuthClientBootstrap.java:89 www.example.com)中使用Spark的auth协议进行身份验证https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java#L99
通过使用Spark的auth协议执行身份验证来引导{@link TransportClient}。如果服务器在身份验证过程中抛出错误,并且配置允许,则此 Bootstrap 福尔斯退到使用SASL Bootstrap 。这用于向后兼容不支持新协议的外部shuffle服务。
然后它也无法在doBootstrap(SaslClientBootstrap.java:70 www.example.com)中使用SASL进行身份验证https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java#L54

  • 通过对连接执行SASL身份验证来引导{@link TransportClient}。服务器应使用{@link SaslRpcHandler}进行设置,该{@link SaslRpcHandler}具有与给定appId匹配的密钥。
  • 通过发送令牌执行SASL身份验证,然后继续使用SASL质询-响应令牌,直到成功进行身份验证或由于不匹配而引发异常。

相关问题