我在scala中有一个程序,它读取一个csv文件,在数据框中添加一个新列,并将结果保存为Parquet文件。它在小文件(<5 go)上运行得非常好,但是当我尝试使用较大的文件(~80 go)时,当它应该使用以下stacktrace写入parquet文件时总是失败:
16/10/20 10:03:37 WARN scheduler.TaskSetManager: Lost task 14.0 in stage 4.0 (TID 886, 10.0.0.10): java.io.EOFException: reached end of stream after reading 136445 bytes; 1245184 bytes expected
at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果有人知道是什么导致了这一切,那对我会有很大帮助!
使用的系统
Spark2.0.1
斯卡拉2.11
hadoop hdfs 2.7.3版
在6个机群中的docker中运行(每个机群有4个核和16个ram)
示例代码
var df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NULL").csv(hdfsFileURLIn)
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName)))
df.repartition(nPartitions, $"ipix").write.mode("overwrite").parquet(hdfsFileURLOut)
3条答案
按热度按时间0s0u357o1#
以下几点可能对您有所帮助:
我认为应该检查ipix列数据的分布,可能会发生数据倾斜,因此1个或几个分区可能比其他分区大得多。这些fat分区可能会导致一个正在处理fat分区的任务失败。它可能与函数a2p的输出有关。我会先测试运行这个作业,即使没有重新分区(只要删除这个调用,并尝试看看它是否成功-没有重新分区调用,它将使用默认分区,可能是按输入csv文件的大小分割)
我还希望您输入的csv不是gzip-ed(因为gzip-ed数据是不可拆分的,所以所有数据都在一个分区中)
v8wbuo2f2#
你能提供代码吗?也许你写的代码正在驱动程序上运行?如何处理文件?
处理大数据有一个特殊的spark功能,例如rdd。一旦你这样做了:
您将rdd带到驱动程序内存中,因此不使用spark的功能。处理大数据的代码应该在从属服务器上运行。
请检查:区分apachespark中的驱动程序代码和工作代码
ncgqoxb03#
这个问题看起来像是在yarn模式下解压一个乱序数据流时读取失败。
尝试下面的代码,看看它是如何运行的。
还有一个类似的问题Spark作业失败在Yarn模式