使用globstatus和google云存储桶作为输入时无法运行spark作业

jgwigjjp  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(397)

我用的是spark 1.1。我有一个spark工作,它只在bucket下寻找特定模式的文件夹(即以…)开头的文件夹,并且应该只处理这些文件夹。我通过以下步骤来实现这一点:

FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);

List<String> pathsStr = convertFileStatusToPath(statusList);

JavaRDD<String> paths = sc.parallelize(pathsStr);

但是,在google云存储路径gs://rsync-1/2014\u 07\u 31*(使用最新的google cloud storage connector 1.2.9)上运行此作业时,出现以下错误:

4/10/13 10:28:38 INFO slf4j.Slf4jLogger: Slf4jLogger started    
14/10/13 10:28:38 INFO util.Utils: Successfully started service 'Driver' on port 60379.    
14/10/13 10:28:38 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@hadoop-w-9.c.taboola-qa-01.internal:45212/user/Worker    
Exception in thread "main" java.lang.reflect.InvocationTargetException    
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)    
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    
    at java.lang.reflect.Method.invoke(Method.java:606)    
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)    
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)    
Caused by: java.lang.IllegalArgumentException: Wrong bucket: rsync-1, in path: gs://rsync-1/2014_07_31*, expected bucket: hadoop-config    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:100)    
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:294)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:457)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:163)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1052)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1027)    
    at com.doit.customer.dataconverter.Phase0.main(Phase0.java:578)    
... 6 more

当我在本地文件夹上运行此作业时,一切正常。
hadoop config是我用来在google计算引擎上部署spark集群的一个bucket(使用bdutil 0.35.2工具)

2fjabf4q

2fjabf4q1#

简短的回答
而不是使用:

FileSystem fs = FileSystem.get(new Configuration(true));
    FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
    List<FileStatus> statusList = Arrays.asList(statusArr);

你需要做什么

Path inputPathObj = new Path(inputPath);
    FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
    FileStatus[] statusArr = fs.globStatus(inputPathObj);
    List<FileStatus> statusList = Arrays.asList(statusArr);

因为在hadoop中,文件系统示例是基于 scheme 以及 authority uri的组件(以及更高级设置中的潜在用户组信息),并且这样的示例在方案和权限之间不可互换。
冗长的回答
这与 hostname 以及 path 组件 URI 在[scheme]://[authority]/[path]中,这在hdfs用例中可能更明显,但也适用于gcs。基本上,有几个 get org.apache.hadoop.fs.filesystem中的方法,这里最适用的方法是:

public static FileSystem get(Configuration conf)

public static FileSystem get(URI uri, Configuration conf)

前者实际上只是用:

return get(getDefaultUri(conf), conf);

哪里 getDefaultUri(conf) 定义为 fs.default.name 或者 fs.defaultFS . 第二个需要考虑的问题是 hosthname 或者 authority 组件被认为是本质上不同的文件系统;在hdfs的情况下,这是有意义的,因为:

FileSystem.get("hdfs://foo-cluster-namenode/", conf);
    FileSystem.get("hdfs://bar-cluster-namenode/", conf);

每个点位于不同集群上可能完全不同的文件系统示例上,允许在两个不同的hdfs示例上使用相同的路径名来引用不同的存储名称空间。虽然在机器的“主机名”方面不那么透明,但是 bucket 在地面军事系统中,确实扮演着 authority gce uri的组件——在hadoop中,这意味着 FileSystem.getbucket 是相同的,但是对于不同的bucket有不同的示例。正如您不能创建hdfs示例并将其指向不同的权限一样:

// Can't mix authorities!
    FileSystem.get("hdfs://foo/", conf).listStatus(new Path("hdfs://bar/"));

当你打电话的时候 FileSystem.get(conf) 你实际上得到了一个缓存示例 gs://hadoop-config/ ,然后用它来列出 gs://rsync-1 .
相反,当您知道要对其进行操作的路径时,应该是获取文件系统示例的时间:

FileSystem fs = FileSystem.get(myPath.toUri(), new Configuration(true));
    fs.globStatus(myPath);

相关问题