运行spark作业时出现奇怪的错误

htrmnn0y  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(477)

我正在运行一个有80台机器的Spark束。每台机器都是一个具有8核和50gb内存的虚拟机(spark似乎有41个可用内存)。
我在几个输入文件夹上运行,我估计输入的大小约为250gbgz压缩。
我在驱动程序日志中遇到错误,我不知道该怎么做。示例(按它们在日志中出现的顺序排列):

240884 [Result resolver thread-0] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 445.0 in stage 1.0 (TID 445, hadoop-w-59.c.taboola-qa-01.internal): java.net.SocketTimeoutException: Read timed out
        java.net.SocketInputStream.socketRead0(Native Method)
        java.net.SocketInputStream.read(SocketInputStream.java:152)
        java.net.SocketInputStream.read(SocketInputStream.java:122)
        java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
        java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
        java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
        sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1323)
        org.apache.spark.util.Utils$.fetchFile(Utils.scala:376)
        org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325)
        org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323)
        scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

    271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException: 
            org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
            org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
            org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            org.apache.spark.scheduler.Task.run(Task.scala:54)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)

309052 [Result resolver thread-1] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 272.0 in stage 2.0 (TID 908, hadoop-w-58.c.taboola-qa-01.internal): java.io.IOException: unexpected exception type
        java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
        java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

820940 [connection-manager-thread] INFO org.apache.spark.network.ConnectionManager  - key already cancelled ? sun.nio.ch.SelectionKeyImpl@1c827563
java.nio.channels.CancelledKeyException
    at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
    at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

由于我的作业类(phase0)不是任何堆栈跟踪的一部分,我不确定从这些错误中可以学到什么。有什么建议吗?
编辑:特别是,即使在处理几个gb的文件夹时,以下异常也会发生在我身上:

271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException: 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
            org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
            org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            org.apache.spark.scheduler.Task.run(Task.scala:54)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)
ylamdve6

ylamdve61#

解决方案并不特定于这里提到的例外情况,但最终我能够使用以下准则克服spark中的所有问题:
所有机器都应该在ulimit和进程内存方面进行如下调整:
将以下内容添加到/etc/security/limits.conf:

hadoop soft nofile 900000
root soft nofile 900000
hadoop hard nofile 990000
root hard nofile 990000
hadoop hard memlock unlimited
root hard memlock unlimited
hadoop soft memlock unlimited
root soft memlock unlimited

至/etc/pam.d/common-session和至/etc/pam.d/common-session-noninteractive:

"session required pam_limits.so"

核心用法:如果使用vm,我建议为spark分配n-1个核心,为通信和其他任务保留1个核心。
分区:我建议使用分区数,它是集群中已用核心数的5到10倍。如果您看到“内存不足”错误,您需要增加分区的数量(首先通过增加因子,然后通过向集群添加计算机)
按键输出数组-如果您看到错误,例如:“数组大小超过vm限制”,则每个键的数据可能太多,因此需要减少每个键的数据量。例如,如果按1小时间隔输出文件,请尝试减少到10分钟,甚至1分钟间隔。
如果仍然看到错误,请在spark bug报告中查找,您可能需要确保升级到最新的spark版本。对我来说,目前版本1.2修复了一个错误,这将导致我的工作失败。
使用kryo注册器,将所有rdd转换逻辑转换为在单独的类中运行,并确保使用kryo注册所有这些类。

相关问题