hadoop 如何在Dataproc Serverless上运行的Spark中重命名GCS文件?

4uqofj5v  于 2022-11-01  发布在  Hadoop
关注(0)|答案(1)|浏览(154)

在将一个spark Dataframe 写入一个文件后,我尝试使用如下代码重命名该文件:

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))

在本地运行Spark效果很好...但是当我在Dataproc上运行我的jar时,我得到了如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***

似乎文件可能不会保存到目标桶,直到作业结束,因此很难重命名它们。我已经尝试使用.option("mapreduce.fileoutputcommitter.algorithm.version", "2"),因为我读到了一些关于这方面的东西,看起来很有希望。
更新:仍然不走运。spark.sparkContext.hadoopConfiguration似乎期望基本存储桶是dataproc-temp-*存储桶。下面是完整的堆栈跟踪:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)
6psbrbz9

6psbrbz91#

FileSystem.get(...)调用返回的HCFS示例绑定到特定FS(在本例中为GCS存储桶)。默认情况下,通过spark.hadoop.fs.defaultFS Spark属性将Dataproc Serverless Spark配置为使用gs://daptaproc-temp-*/存储桶作为默认HCFS。
要解决此问题,您需要使用FileSystem#get(URI uri, Configuration conf)调用创建HCFS示例:

val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)

相关问题