Pyspark java.lang.OutOfMemoryError错误与wholeTextFiles

bmp9r5qi  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(137)

我在data_dir中有1160个XML文件,每个文件的大小为300MB。我想数一下单词的总数。我有一台本地机器,有256个核心和256 GB RAM。考虑到整个数据大小约为300 GB,我知道我可能无法同时处理所有文件。我想知道如何进行批处理或分区以避免java. lang.OutOfMemoryError: Java heap space错误?这是我的代码:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    data_dir = '/shared/hm31/xml_data/'

   

    conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
    conf.set("spark.driver.memory", "200g")
    conf.set("spark.executor.memory", "6g")  # Example: Increase executor memory

    sc = SparkContext(conf = conf)

    rdd = sc.wholeTextFiles(data_dir,500)
    words = rdd.flatMap(lambda x: x.split())
    print("words",words.count())

更新:即使我使用200GB RAM和3个CPU,我仍然会出现内存错误。
这是我的代码:

data_dir = '/shared/hm31/xml_data/'



conf = SparkConf().setMaster("local[3]").setAppName("WordCount")
conf.set("spark.driver.memory", "200g")

sc = SparkContext(conf = conf)

rdd = sc.wholeTextFiles(data_dir,512)
words = rdd.flatMap(lambda x: x.split())
print("Words",words.count())

这是我得到的错误:

23/10/03 13:24:28 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
23/10/03 13:24:28 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
23/10/03 13:24:29 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
nzk0hqpo

nzk0hqpo1#

如果你想保持简单,你可以按顺序执行以下步骤:
1.您正在尝试在应用程序启动后设置spark.driver.memory配置。这是不可能的您应该在命令start中设置此参数以启动应用程序。举例来说:pyspark --driver-memory 200g ...spark-submit --driver-memory 200g ...。如果没有这个,你只是运行与默认的1g内存。

  • 您使用的是.setMaster("local[*]"),这意味着您正在本地模式下运行spark。在这种情况下,spark.executor.memory被忽略。所以:
  • 删除spark.executor.memory
  • 使spark.driver.memory尽可能大(类似于240g,取决于机器上运行的其他进程)
  • 如果在前面的步骤之后仍然出现内存不足错误,这意味着对于您的工作负载,您没有足够的RAM/CPU。这完全取决于您的数据(以及其中最大的文件)。在这种情况下,您可以降低正在运行的CPU数量,并拥有更多的RAM/CPU。例如,如果你想限制你的计算使用128 CPU(而不是[*],这意味着所有可用的CPU),你可以做以下事情:
.setMaster("local[128]")

相关问题