我在data_dir
中有1160个XML文件,每个文件的大小为300MB。我想数一下单词的总数。我有一台本地机器,有256个核心和256 GB RAM。考虑到整个数据大小约为300 GB,我知道我可能无法同时处理所有文件。我想知道如何进行批处理或分区以避免java. lang.OutOfMemoryError: Java heap space
错误?这是我的代码:
from pyspark import SparkConf, SparkContext
if __name__ == "__main__":
data_dir = '/shared/hm31/xml_data/'
conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
conf.set("spark.driver.memory", "200g")
conf.set("spark.executor.memory", "6g") # Example: Increase executor memory
sc = SparkContext(conf = conf)
rdd = sc.wholeTextFiles(data_dir,500)
words = rdd.flatMap(lambda x: x.split())
print("words",words.count())
更新:即使我使用200GB RAM和3个CPU,我仍然会出现内存错误。
这是我的代码:
data_dir = '/shared/hm31/xml_data/'
conf = SparkConf().setMaster("local[3]").setAppName("WordCount")
conf.set("spark.driver.memory", "200g")
sc = SparkContext(conf = conf)
rdd = sc.wholeTextFiles(data_dir,512)
words = rdd.flatMap(lambda x: x.split())
print("Words",words.count())
这是我得到的错误:
23/10/03 13:24:28 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
23/10/03 13:24:28 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
23/10/03 13:24:29 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
1条答案
按热度按时间nzk0hqpo1#
如果你想保持简单,你可以按顺序执行以下步骤:
1.您正在尝试在应用程序启动后设置
spark.driver.memory
配置。这是不可能的您应该在命令start中设置此参数以启动应用程序。举例来说:pyspark --driver-memory 200g ...
或spark-submit --driver-memory 200g ...
。如果没有这个,你只是运行与默认的1g内存。.setMaster("local[*]")
,这意味着您正在本地模式下运行spark。在这种情况下,spark.executor.memory
被忽略。所以:spark.executor.memory
spark.driver.memory
尽可能大(类似于240g
,取决于机器上运行的其他进程)[*]
,这意味着所有可用的CPU),你可以做以下事情: