我正在运行一个有80台机器的Spark束。每台机器都是一个具有8核和50gb内存的虚拟机(spark似乎有41个可用内存)。
我在几个输入文件夹上运行,我估计输入的大小约为250gbgz压缩。
我在驱动程序日志中遇到错误,我不知道该怎么做。示例(按它们在日志中出现的顺序排列):
240884 [Result resolver thread-0] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 445.0 in stage 1.0 (TID 445, hadoop-w-59.c.taboola-qa-01.internal): java.net.SocketTimeoutException: Read timed out
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
java.io.BufferedInputStream.read(BufferedInputStream.java:334)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1323)
org.apache.spark.util.Utils$.fetchFile(Utils.scala:376)
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325)
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323)
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException:
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
309052 [Result resolver thread-1] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 272.0 in stage 2.0 (TID 908, hadoop-w-58.c.taboola-qa-01.internal): java.io.IOException: unexpected exception type
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
820940 [connection-manager-thread] INFO org.apache.spark.network.ConnectionManager - key already cancelled ? sun.nio.ch.SelectionKeyImpl@1c827563
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
由于我的作业类(phase0)不是任何堆栈跟踪的一部分,我不确定从这些错误中可以学到什么。有什么建议吗?
编辑:特别是,即使在处理几个gb的文件夹时,以下异常也会发生在我身上:
271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException:
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
1条答案
按热度按时间ylamdve61#
解决方案并不特定于这里提到的例外情况,但最终我能够使用以下准则克服spark中的所有问题:
所有机器都应该在ulimit和进程内存方面进行如下调整:
将以下内容添加到/etc/security/limits.conf:
至/etc/pam.d/common-session和至/etc/pam.d/common-session-noninteractive:
核心用法:如果使用vm,我建议为spark分配n-1个核心,为通信和其他任务保留1个核心。
分区:我建议使用分区数,它是集群中已用核心数的5到10倍。如果您看到“内存不足”错误,您需要增加分区的数量(首先通过增加因子,然后通过向集群添加计算机)
按键输出数组-如果您看到错误,例如:“数组大小超过vm限制”,则每个键的数据可能太多,因此需要减少每个键的数据量。例如,如果按1小时间隔输出文件,请尝试减少到10分钟,甚至1分钟间隔。
如果仍然看到错误,请在spark bug报告中查找,您可能需要确保升级到最新的spark版本。对我来说,目前版本1.2修复了一个错误,这将导致我的工作失败。
使用kryo注册器,将所有rdd转换逻辑转换为在单独的类中运行,并确保使用kryo注册所有这些类。