在将一个spark Dataframe 写入一个文件后,我尝试使用如下代码重命名该文件:
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))
在本地运行Spark效果很好...但是当我在Dataproc上运行我的jar时,我得到了如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***
似乎文件可能不会保存到目标桶,直到作业结束,因此很难重命名它们。我已经尝试使用.option("mapreduce.fileoutputcommitter.algorithm.version", "2")
,因为我读到了一些关于这方面的东西,看起来很有希望。
更新:仍然不走运。spark.sparkContext.hadoopConfiguration
似乎期望基本存储桶是dataproc-temp-*
存储桶。下面是完整的堆栈跟踪:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)
1条答案
按热度按时间6psbrbz91#
FileSystem.get(...)
调用返回的HCFS示例绑定到特定FS(在本例中为GCS存储桶)。默认情况下,通过spark.hadoop.fs.defaultFS
Spark属性将Dataproc Serverless Spark配置为使用gs://daptaproc-temp-*/
存储桶作为默认HCFS。要解决此问题,您需要使用
FileSystem#get(URI uri, Configuration conf)
调用创建HCFS示例: