org.apache.spark.shuffle.fetchfailedexception:来自server1/..x的连接:7337 closed

s2j5cfk0  于 2021-05-11  发布在  Spark
关注(0)|答案(1)|浏览(845)

突出
我升级了spark,并尝试在yarn上运行已经存在的spark流应用程序(通过stream接受文件名,然后从hdfs读取文件名,使用rdd和dataframes操作进行转换,最后将分析的数据集持久化到hbase),但该应用程序失败,无法解决问题。
环境详情如下
使用版本
平台
操作系统:rhel 6.6,128gb ram,42tb hdd,32核
java:1.8.0\u 25
斯卡拉:2.11
hadoop:2.7.7版本
spark:2.4.6和Hadoop2.7二进制文件
hbase:1.4.12版
升级后不工作
spark:3.0.0和Hadoop2.7二进制文件
按照spark 3.0.0的要求,使用scala 2.12编译相同的代码,spark 3.0.0根据版本的变化有一些小的变化,没有任何逻辑变化。
所需Yarn配置

<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
  <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
  <name>spark.shuffle.service.port</name>
  <value>7337</value>
</property>

启动作业时传递了spark配置

spark.app.name=Ingestion
spark.eventLog.enabled=true
spark.yarn.historyServer.address=${hadoopconf-yarn.resourcemanager.hostname}:18088
spark.eventLog.dir=hdfs:///user/hduser/applicationHistory
spark.submit.deployMode=cluster
spark.driver.memory=1GB
spark.driver.cores=1
spark.executor.memory=5GB
spark.executor.cores=5
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.sql.shuffle.partitions=2001
spark.logging.level=INFO
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.yarn.archive=hdfs:///spark-3.0.0-bin-hadoop2.7-jars.zip
spark.ui.killEnabled=false
spark.driver.memoryOverhead=512
spark.executor.memoryOverhead=1024
spark.yarn.maxAppAttempts=4
spark.yarn.am.attemptFailuresValidityInterval=1h

问题
同样的代码也适用于spark版本2.4.4、2.4.5、2.4.6和hadoop、yarn、spark配置集。当我升级到spark 3.0.0时,代码开始失败,出现以下异常。已经尝试过多次调优,比如增加资源、减少分区等等,但是没有成功。我们已经通过telnet检查了7337端口,它是开放的,并且正在监听。经过一个星期的调试,我们无法找到任何解决方案,而且似乎没有理由关闭shuffle端口连接。
处理几乎不到50MB的数据集。同样的代码能够处理超过300MB的数据,其配置与spark 2.4.x完全相同。这太奇怪了!!
例外

org.apache.spark.shuffle.FetchFailedException: Connection from server1/xxx.xxx.x.xxx:7337 closed
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:748)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:663)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:70)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:155)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:129)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$Lambda$597/1323895653.apply(Unknown Source)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:859)
    at org.apache.spark.rdd.RDD$$Lambda$584/1207730390.apply(Unknown Source)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$421/1364680867.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection from server1/xxx.xxx.x.xxx:7337 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

还有其他人面临这个问题吗?如果是,请告诉我你是如何解决的。我不知道还有什么要查。任何帮助都将不胜感激
谢谢您

aydmsdu9

aydmsdu91#

Spark3.x不能与旧的shuffle服务一起使用。
如果要保留旧的shuffle服务,请尝试以下配置更改。

spark.shuffle.useOldFetchProtocol=true

参考https://issues.apache.org/jira/browse/spark-29435

相关问题