delta lake:找不到文件异常

waxmsbnn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(542)

我正在使用delta lake执行合并操作,为此,我正在尝试将我的Parquet地板文件转换为delta格式,并随时间进行分区:

val source = spark.read.parquet("s3a://data-lake/source/")
       source
      .write
      .option("maxRecordsPerFile",20000)
      .mode("overwrite")
      .partitionBy("time")
      //.option("fs.s3a.committer.name", "partitioned") (I even tried using s3a committers)
      .format("delta")
      .save("s3a://data-lake/target/")

数据超过250g,我的spark配置如下:

spark.cores.max 420
spark.default.parallelism   10000
spark.delta.logStore.class  org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
spark.driver.extraJavaOptions   -Xms20g
spark.driver.memory 28g
spark.executor.cores    2
spark.executor.memory 28G

在日志中,它显示文件未找到错误,并在运行一段时间后最终杀死执行器:

20/05/16 11:51:02 WARN TaskSetManager: Lost task 208.0 in stage 2.0 (TID 3294, 172.16.145.25, executor 14): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://data-lake/target/time=20190101/part-00208-2c9b5ddd-f2c1-4b8c-9d77-eacb0055ff82.c000.snappy.parquet

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 53 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我试着将它保存为Parquet格式,它确实按预期工作。但这需要更多的时间来处理。

vs3odd8k

vs3odd8k1#

如果它已经在parquet中,您可以将s3中的文件夹复制到目标位置而不使用spark,并使用以下行将其初始化为delta表:

import io.delta.DeltaTable
DeltaTable.forPath("s3a://data-lake/target/"))

更多信息请点击此处。

相关问题