我有一个包含10万个以上文件的输入文件夹。
我想对它们执行批处理操作,即以某种方式重命名所有这些文件,或者根据每个文件名中的信息将它们移动到新路径。
我想使用spark来实现这一点,但不幸的是,当我尝试以下代码时:
final org.apache.hadoop.fs.FileSystem ghfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(args[0]), new org.apache.hadoop.conf.Configuration());
org.apache.hadoop.fs.FileStatus[] paths = ghfs.listStatus(new org.apache.hadoop.fs.Path(args[0]));
List<String> pathsList = new ArrayList<>();
for (FileStatus path : paths) {
pathsList.add(path.getPath().toString());
}
JavaRDD<String> rddPaths = sc.parallelize(pathsList);
rddPaths.foreach(new VoidFunction<String>() {
@Override
public void call(String path) throws Exception {
Path origPath = new Path(path);
Path newPath = new Path(path.replace("taboola","customer"));
ghfs.rename(origPath,newPath);
}
});
我得到一个错误,hadoop.fs.filesystem是不可序列化的(因此可能不能用于并行操作)
你知道我该怎么解决这个问题还是换个方法?
4条答案
按热度按时间oxf4rvwz1#
问题是您正在尝试序列化ghfs对象。如果您使用mappartitions并在每个分区中重新创建ghfs对象,您将能够运行您的代码,只需做一些小的更改。
rkue9o1l2#
你需要做什么
FileSystem.get
内部VoidFunction
我也是。驱动程序需要一个文件系统来获取文件列表,但每个工作程序也需要一个文件系统来进行重命名。驱动程序无法将其文件系统传递给工作程序,因为它不可序列化。但是工人们可以得到他们自己的文件系统。
在scalaapi中,您可以使用
RDD.foreachPartition
以一种你只需要FileSystem.get
每个分区一次,而不是每行一次。它可能也可以在javaapi中使用。0s0u357o3#
当我的hdfs归档目录达到最大项目限制时,我也遇到了类似的问题
我决定将上一年(2015年)的所有项目移到单独的子文件夹中。这是纯壳溶液
评论:
需要客户端选项覆盖
hdfs dfs -ls
因为文件太多了。请参阅此处了解更多详细信息。hdfs dfs
客户端对参数列表长度有限制:关于131000
(2^17
)查尔斯。移动420k个文件花了几分钟时间。
a7qyws3x4#
我建议重命名它们,就像在非map reduce上下文中重命名文件系统类一样(就在驱动程序中),重命名100k文件没什么大不了的,它太慢了,然后可以尝试多线程处理。做点像这样的事
顺便说一句,您在spark中遇到的错误是因为spark需要它用来实现可序列化的对象,而文件系统不需要。
我无法确认这一点,但hdfs中的每个重命名似乎都会涉及namenode,因为它跟踪文件的完整目录结构和节点位置(确认链接),这意味着无法高效地并行完成。根据这个答案,重命名是一个仅元数据的操作,所以它应该是非常快速的串行运行。