spark在具有java.io.ioexception的大洗牌作业上失败:文件系统关闭

v1l68za4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(462)

我经常发现spark在大的工作中失败了,只有一个毫无意义的例外。工作日志看起来正常,没有错误,但它们的状态为“killed”。这对于大型洗牌非常常见,因此 .distinct .
问题是,我如何诊断出哪里出了问题,理想情况下,我如何修复它?
考虑到这些操作中有很多都是单向的,我一直在解决这个问题,将数据分成10个块,在每个块上运行应用程序,然后在所有结果输出上运行应用程序。换句话说-元Map减少。

14/06/04 12:56:09 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
14/06/04 12:56:09 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
    at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
    at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
    at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
kqlmhetl

kqlmhetl1#

截至2014年9月1日,这是spark的“开放式改进”。请看https://issues.apache.org/jira/browse/spark-3052. 正如syrza在给定的链接中指出的那样,当执行器失败时,关闭钩子很可能是以错误的顺序完成的,从而导致此消息。我知道你将需要更多的调查来找出问题的主要原因(即你的遗嘱执行人失败的原因)。如果这是一个大的洗牌,它可能是一个内存不足的错误,导致执行器失败,然后导致hadoop文件系统在关闭钩子时关闭。因此,运行该执行器任务的recordreaders抛出“java.io.ioexception:filesystem closed”异常。我猜它将在后续版本中修复,然后您将得到更多有用的错误消息:)

6gpjuf90

6gpjuf902#

有东西在召唤 DFSClient.close() 或者 DFSClient.abort() ,关闭客户端。下一个文件操作将导致上述异常。
我会想办法弄清楚 close() / abort() . 您可以在调试器中使用断点,或者修改hadoop源代码以在这些方法中引发异常,这样就可以获得堆栈跟踪。

nkcskrwz

nkcskrwz3#

如果spark作业在集群上运行,“文件系统关闭”的异常可以得到解决。您可以根据资源可用性将spark.executor.cores、spark.driver.cores和spark.akka.threads等属性设置为最大值。当我的数据集非常大,包含了大约2000万条记录的json数据时,我也遇到了同样的问题。我用上面的属性修复了它,它就像一个魔咒。在我的例子中,我将这些属性分别设置为25、25和20。希望有帮助!!
参考链接:
http://spark.apache.org/docs/latest/configuration.html

相关问题