在s3 bucket中加载时发生sparkoutofmemoryerror

envsm3lx  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(366)

我有一个Dataframe,并写入s3桶的目标位置。在代码中,coalesce用于加载数据和获取sparkoutofmemoryerror。当前的代码coalesce使用了多个项目,看到了许多建议重新分区的解决方案,它对我很有用。即使它没有记录,coalesce也不起作用。有没有其他方法可以在不更改为重新分区的情况下解决此问题?
代码:

empsql = 'Select * From Employee'
df = spark.sql(empsql) ##Spark is configured
df.coalesce(2).write.mode('overwrite').format("parquet").option("delimiter",'|').save(s3_path, header = True)

错误:
org.apache.spark.sparkexception:任务在写入行时失败。位于org.apache.spark.sql.execution.datasources.fileformatwriter$.org$apache$spark$sql$execution$datasources$fileformatwriter$$executetask at org.apache.spark.sql.execution.datasources.execution.fileformatwriter$$anonfun$write$1.apply at org.apache.spark.sql.executions.fileformatwriter$$anonfun$write$1.apply atorg.apache.spark.scheduler.resulttask.runtask位于org.apache.spark.scheduler.task.run位于org.apache.spark.executor.executor$taskrunner$$anonfun$10.apply位于org.apache.spark.util.utils$.trywithsafefinally位于org.apache.spark.executor.executor$taskrunner.run位于java.util.concurrent.threadpoolexecutor$worker.run at java.lang.thread.run原因:org.apache.spark.memory.sparkoutofmemoryerror:无法获取44字节内存,在org.apache.spark.memory.memoryconsumer.throwoom(memoryconsumer.java:)在org.apache.spark.memory.memoryconsumer.allocatepage在org.apache.spark.util.collection.unsafe.sort.unsafeexternalsorter.acquirenewpageifnecessable(unsafeexternalsorter)获得0。java:383)在org.apache.spark.util.collection.unsafe.sort.unsafeexternalsorter.insertrecord(unsafeexternalsorter。java:407)位于org.apache.spark.sql.execution.unsafeexternalrowsorter.insertrow(unsafeexternalrowsorter。java:135)在org.apache.spark.sql.catalyst.expressions.generatedclass$generateditorForCodeGenStage29.sort\u addtosorter\u 0$(未知源)上org.apache.spark.sql.catalyst.expressions.generatedclass$generateEditorForCodeGenStage29.processnext(未知源代码),位于org.apache.spark.sql.execution.bufferedrowiterator.hasnext(bufferedrowiterator)。java:43)在org.apache.spark.sql.execution.whistagecodegenexec$$anonfun$11$$anon$1.hasnext(whistagecodegenexec。scala:619)在org.apache.spark.sql.execution.roweiteratorfromscala.advancenext(roweiterator。scala:83)在org.apache.spark.rdd.zippedPartitionsRD2.compute(zippedpartitionsrdd。scala:89)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:324)在org.apache.spark.rdd.rdd.iterator(rdd。scala:288)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:324)在org.apache.spark.rdd.rdd.iterator(rdd。scala:288)在org.apache.spark.rdd.coalescedrdd$$anonfun$compute$1.apply(coalescedrdd。scala:100)在org.apache.spark.rdd.coalescedrdd$$anonfun$计算$1.apply(coalescedrdd。scala:99)在scala.collection.iterator$$anon$12.nextcur(iterator。scala:435)在scala.collection.iterator$$anon$12.hasnext(iterator。scala:441)在org.apache.spark.sql.execution.datasources.fileformatwriter$$anonfun$org$apache$spark$sql$execution$datasources$fileformatwriter$$executetask$3.apply(fileformatwriter)。scala:241)在org.apache.spark.sql.execution.datasources.fileformatwriter$$anonfun$org$apache$spark$sql$execution$datasources$fileformatwriter$$executetask$3.apply(fileformatwriter)。scala:239)在org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils。scala:1394)在org.apache.spark.sql.execution.datasources.fileformatwriter$.org$apache$spark$sql$execution$datasources$fileformatwriter$$executetask(fileformatwriter)。scala:245)

lymnna71

lymnna711#

不确定这是否对你有效,但试着这样做

df.coalesce(2,shuffle=true).write.mode('overwrite').format("parquet").option("delimiter",'|').save(s3_path, header = True)

shufle=true将添加一个shuffle步骤。分区将并行执行。这种行为类似于使用重新分区

相关问题