我在做Pypark的工作:
spark-submit --master yarn-client --driver-memory 150G --num-executors 8 --executor-cores 4 --executor-memory 150G benchmark_script_1.py hdfs:///tmp/data/sample150k 128 hdfs:///tmp/output/sample150k | tee ~/output/sample150k.log
这份工作本身就相当标准。它只是抓取了一些文件并进行计数
print(str(datetime.now()) + " - Ingesting files...")
files = sc.wholeTextFiles(inputFileDir, partitions)
fileCount = files.count()
print(str(datetime.now()) + " - " + str(fileCount) + " files ingested")
源文件夹包含约150000个文件。没有复制的时候是35g,有复制的时候是105g。相当重但不疯狂。
运行上述命令会产生以下堆栈跟踪:
15/08/11 15:39:20 WARN TaskSetManager: Lost task 61.3 in stage 0.0 (TID 76, <NODE>): java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
更多信息可在违规执行者日志中找到:
15/08/11 12:28:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
15/08/11 12:28:18 ERROR util.Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:939)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:939)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
raise EOFError
EOFError
Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR executor.Executor: Exception in task 7.0 in stage 0.0 (TID 5)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
raise EOFError
EOFError
我已禁用hdfs缓存:
conf.set("fs.hdfs.impl.disable.cache", True)
请注意,scala中完全相同的脚本根本没有任何问题。
虽然这是一个很大的作业,但是有大量的内存可供使用。有人知道问题是什么吗?
更新
为jvm分配了更多内存。
export set JAVA_OPTS="-Xmx6G -XX:MaxPermSize=2G -XX:+UseCompressedOops"
可悲的是,没有改善。
1条答案
按热度按时间5f0d552i1#
我在spark submit和java上遇到了类似的问题,保存了8gb的Dataframe。docker容器,16核,300gb内存。我还没有解决这个问题,但是我遇到了几个可能的解决方法:
从第77页开始,lightbend认为这是shell的问题,在对象中使用@transient或封装可能是一种解决方法。这似乎不适用于我们的情况。
databricks建议
spark.sql.shuffle.partitions
可能会有帮助。他们建议将默认的“200”改为“400”。我试过“800”和“2000”spark-defaults.conf
但还是得到了错误。databricks还建议
DataFrame.repartition(400)
在密码里。或者,增加partitions
作为调用的最后一个参数sc.wholeTextFiles(inputFileDir, partitions)
这个JAVA_OPTS
stackoverflow的建议不适用于-XX:+UseCompressedOops
如果堆大小大于32gb,则禁用(在Java8中)编辑
也尝试过:
spark.default.parallelism=1000
(默认值为核心数)。仍然有错误。dataFrame.repartition(1000)
在密码里。仍然有错误。可能的解决方法
使用中间体
RDD<LabeledPoint>
允许我创建dataframe,但是spark反射模式不能与mllib(缺少numclasses属性)一起使用。DataFrame df = sqlContext.createDataFrame(sc.parallelize(List<LabeledPoint>),LabeledPoint.class)
通过使用中间json文件,我可以创建dataframe来使用mllib。saveAsJson(List<Row>/*generated data*/, filename); DataFrame df = sqlContext.read().json(filename)