我正在使用spark从一个目录及其子目录的单个rdd中读取hdfs中所有文件的数据。我找不到任何有效的方法来做那件事。因此,我尝试编写一些定制代码,如下所示:
public Object fetch(String source,String sink) {
//reading data
boolean isDir=new File(source).isDirectory();
System.out.println("isDir="+isDir);
JavaRDD<String> lines;
if(isDir)
{
lines=readFiles(new File(source).listFiles(), null);
}
else
lines= sc.textFile(source);
lines.saveAsTextFile(sink);
return true;
}
public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
for (File file : files) {
if (file.isDirectory()) {
readFiles(file.listFiles(),lines); // Calls same method again.
}
else {
if(lines==null)
lines=sc.textFile(file.getPath());
else
{
JavaRDD<String> r=sc.textFile(file.getPath());
lines.union(r);
}
}
}
return lines;
}
但这并不是我期望的工作,因为isdir包含错误的信息,告诉我它不是一个目录。你能告诉我怎么了吗?有什么有效的方法来做这项工作吗?多谢了
3条答案
按热度按时间pbpqsu0x1#
所以我终于找到了解决办法。我错了,因为我使用的是文件对象,用于从本地文件系统读取文件。为了读/写hdfs,我们需要使用org.apache.hadoop.fs*
这就是解决办法
gojuced72#
由于spark可以基于hadoop作业配置读取数据,因此可以使用
FileInputFormat#setInputDirRecursive
方法。很明显,最终将使用文本数据类型而不是字符串。
z5btuh9x3#
“*”字符递归读取文件夹
阅读此链接了解更多信息:
http://spark.apache.org/docs/latest/programming-guide.html#external-数据集