在Spark中使用FileUtil.copy将文件从HDFS上传到S3,导致DiskErrorException:目录不可写错误?

l0oc07j2  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(196)

我正在尝试将一个 parquet 文件写入HDFS,然后将其复制到S3。
我用齐柏林飞艇写的代码,运行得很好。没有任何问题,它把文件添加到了S3文件路径中。

  1. var outputFolder = "buckent_name/path"
  2. println("\n ---- TASK 1 ----- \n writing with path " + outputFolder)
  3. wholeParquetFile
  4. .withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
  5. .withColumn("year", year(col("date_col")))
  6. .withColumn("month", month(col("date_col")))
  7. .withColumn("day", dayofmonth(col("date_col")))
  8. .drop("date_col")
  9. .repartition(1)
  10. .write.mode(SaveMode.Overwrite)
  11. .partitionBy("year", "month", "day")
  12. .parquet(outputFolder)
  13. val sc = spark.sparkContext
  14. val fs = FileSystem.get(sc.hadoopConfiguration)
  15. val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder + "/year=*/month=*/day=*/*"))
  16. println("------- allTheFilesThatBennCreated -------" + allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))
  17. // right now the file path will be outputFile + "/year=2021/month=5/day=17/part-....c000.snappy.parquet
  18. // converting it to outputFile + "/2021/5/17/part-....c000.snappy.parquet"
  19. allTheFilesThatBennCreated.foreach(path => {
  20. val newPathString = generateOutputFilePathString(path.getPath.toString)
  21. val outputFilePath = new Path(newPathString)
  22. val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
  23. val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)
  24. println("-------- source filesystem ------------------" + sourceFileSystem)
  25. println("-------- path.getPath --------------" + path.getPath)
  26. println("-------- destinationFileSystem ------------- " + destinationFileSystem)
  27. println("-------- S3 path for Output File ------------" + outputFilePath)
  28. // uploading to s3 from hdfs
  29. FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
  30. })

但是当我尝试在spark-shell中运行相同的代码或通过spark-submit中的jar文件运行相同的代码时,就会出现此错误。

  1. 22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable
  2. org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
  3. at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
  4. at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
  5. at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
  6. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
  7. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
  8. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
  9. at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
  10. at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
  11. at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
  12. at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
  13. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
  14. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
  15. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
  16. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
  17. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
  18. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
  19. at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
  20. at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
  21. at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  22. at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  23. at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
  24. at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
  25. at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
  26. at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
  27. at scala.App$$anonfun$main$1.apply(App.scala:76)
  28. at scala.App$$anonfun$main$1.apply(App.scala:76)
  29. at scala.collection.immutable.List.foreach(List.scala:392)
  30. at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
  31. at scala.App$class.main(App.scala:76)
  32. at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
  33. at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
  34. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  35. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  36. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  37. at java.lang.reflect.Method.invoke(Method.java:498)
  38. at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  39. at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
  40. at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
  41. at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
  42. at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
  43. at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
  44. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
  45. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  46. Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
  47. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
  48. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
  49. at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
  50. at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
  51. at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
  52. at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
  53. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
  54. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
  55. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
  56. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
  57. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
  58. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
  59. at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
  60. at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
  61. at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  62. at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  63. at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
  64. at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
  65. at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
  66. at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
  67. at scala.App$$anonfun$main$1.apply(App.scala:76)
  68. at scala.App$$anonfun$main$1.apply(App.scala:76)
  69. at scala.collection.immutable.List.foreach(List.scala:392)
  70. at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
  71. at scala.App$class.main(App.scala:76)
  72. at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
  73. at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
  74. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  75. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  76. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  77. at java.lang.reflect.Method.invoke(Method.java:498)
  78. at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  79. at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
  80. at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
  81. at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
  82. at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
  83. at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
  84. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
  85. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

有人知道怎么解决这个问题吗?

bwleehnv

bwleehnv1#

您没有提到Zeplin和CLI的环境,无论它们是提交到同一群集还是您的CLI使用本地模式
不过,线索在堆栈跟踪中
任何本地目录中都没有可用空间。
进一步查看,错误出现在FileUtil.copy()处,它试图将临时输出写入由属性mapred.local.dir配置的路径,您可以检查该属性

相关问题