spark在计算大文件时崩溃了

zbq4xfa0  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(522)

我在scala中有一个程序,它读取一个csv文件,在数据框中添加一个新列,并将结果保存为Parquet文件。它在小文件(<5 go)上运行得非常好,但是当我尝试使用较大的文件(~80 go)时,当它应该使用以下stacktrace写入parquet文件时总是失败:

16/10/20 10:03:37 WARN scheduler.TaskSetManager: Lost task 14.0 in stage 4.0 (TID 886, 10.0.0.10): java.io.EOFException: reached end of stream after reading 136445 bytes; 1245184 bytes expected
  at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
  at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
  at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
  at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
  at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

如果有人知道是什么导致了这一切,那对我会有很大帮助!

使用的系统

Spark2.0.1
斯卡拉2.11
hadoop hdfs 2.7.3版
在6个机群中的docker中运行(每个机群有4个核和16个ram)

示例代码

var df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NULL").csv(hdfsFileURLIn)
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName)))
df.repartition(nPartitions, $"ipix").write.mode("overwrite").parquet(hdfsFileURLOut)
0s0u357o

0s0u357o1#

以下几点可能对您有所帮助:
我认为应该检查ipix列数据的分布,可能会发生数据倾斜,因此1个或几个分区可能比其他分区大得多。这些fat分区可能会导致一个正在处理fat分区的任务失败。它可能与函数a2p的输出有关。我会先测试运行这个作业,即使没有重新分区(只要删除这个调用,并尝试看看它是否成功-没有重新分区调用,它将使用默认分区,可能是按输入csv文件的大小分割)
我还希望您输入的csv不是gzip-ed(因为gzip-ed数据是不可拆分的,所以所有数据都在一个分区中)

v8wbuo2f

v8wbuo2f2#

你能提供代码吗?也许你写的代码正在驱动程序上运行?如何处理文件?
处理大数据有一个特殊的spark功能,例如rdd。一旦你这样做了:

someRdd.collect()

您将rdd带到驱动程序内存中,因此不使用spark的功能。处理大数据的代码应该在从属服务器上运行。
请检查:区分apachespark中的驱动程序代码和工作代码

ncgqoxb0

ncgqoxb03#

这个问题看起来像是在yarn模式下解压一个乱序数据流时读取失败。
尝试下面的代码,看看它是如何运行的。

var df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NULL").csv(hdfsFileURLIn)
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName))).persist(StorageLevel.MEMORY_AND_DISK)
df.repartition(nPartitions, $"ipix").write.mode("overwrite").parquet(hdfsFileURLOut)

还有一个类似的问题Spark作业失败在Yarn模式

相关问题