在spark java api中递归读取hdfs中的所有文件

6pp0gazn  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(679)

我正在使用spark从一个目录及其子目录的单个rdd中读取hdfs中所有文件的数据。我找不到任何有效的方法来做那件事。因此,我尝试编写一些定制代码,如下所示:

public Object fetch(String source,String sink) {

    //reading data
    boolean isDir=new File(source).isDirectory();
    System.out.println("isDir="+isDir);
    JavaRDD<String> lines;
    if(isDir)
    {

        lines=readFiles(new File(source).listFiles(), null);
    }
    else
        lines= sc.textFile(source);

    lines.saveAsTextFile(sink);
    return true;
}

public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
    for (File file : files) {
        if (file.isDirectory()) {
            readFiles(file.listFiles(),lines); // Calls same method again.
        } 
        else {
            if(lines==null)
                lines=sc.textFile(file.getPath());
            else
            {
                JavaRDD<String> r=sc.textFile(file.getPath());
                lines.union(r);
            }
        }
    }
    return lines;
}

但这并不是我期望的工作,因为isdir包含错误的信息,告诉我它不是一个目录。你能告诉我怎么了吗?有什么有效的方法来做这项工作吗?多谢了

pbpqsu0x

pbpqsu0x1#

所以我终于找到了解决办法。我错了,因为我使用的是文件对象,用于从本地文件系统读取文件。为了读/写hdfs,我们需要使用org.apache.hadoop.fs*
这就是解决办法

public Object fetch(String source,String sink) {

    //reading data
    Path src=new Path(source);
    try {
        if(fs.exists(src))
        {
            FileStatus[] lists=fs.listStatus(src);
            readFiles(lists);
        }
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return true;
}

public void readFiles(FileStatus[] files) {
        for(int i=0;i<files.length;i++)
        {
            if(files[i].isDirectory())
            {
                try {
                    readFiles(fs.listStatus(files[i].getPath()));
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            else
            {
                    if(lines==null)
                    {
                       Path p=files[i].getPath();
                       JavaRDD<String> lines=sc.textFile(p.toString());
                    }
                    else
                   {
                      JavaRDD<String> r=sc.textFile(file.getPath());
                      lines.union(r);
                   }
        }
  }
  return lines;
 }
gojuced7

gojuced72#

由于spark可以基于hadoop作业配置读取数据,因此可以使用 FileInputFormat#setInputDirRecursive 方法。

JavaSparkContext context = new JavaSparkContext();

Job job;

try {
  job = Job.getInstance();
  FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
  FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
  e1.printStackTrace();
  System.exit(1);
}

JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
  .values();

很明显,最终将使用文本数据类型而不是字符串。

z5btuh9x

z5btuh9x3#

“*”字符递归读取文件夹

JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile("/my/directory/*");

阅读此链接了解更多信息:
http://spark.apache.org/docs/latest/programming-guide.html#external-数据集

相关问题