并行处理—如何在hadoop/spark中重命名大量文件?

ffscu2ro  于 2021-06-03  发布在  Hadoop
关注(0)|答案(4)|浏览(536)

我有一个包含10万个以上文件的输入文件夹。
我想对它们执行批处理操作,即以某种方式重命名所有这些文件,或者根据每个文件名中的信息将它们移动到新路径。
我想使用spark来实现这一点,但不幸的是,当我尝试以下代码时:

final org.apache.hadoop.fs.FileSystem ghfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(args[0]), new org.apache.hadoop.conf.Configuration());
        org.apache.hadoop.fs.FileStatus[] paths = ghfs.listStatus(new org.apache.hadoop.fs.Path(args[0]));
        List<String> pathsList = new ArrayList<>();
        for (FileStatus path : paths) {
            pathsList.add(path.getPath().toString());
        }
        JavaRDD<String> rddPaths = sc.parallelize(pathsList);

        rddPaths.foreach(new VoidFunction<String>() {
            @Override
            public void call(String path) throws Exception {
                Path origPath = new Path(path);
                Path newPath = new Path(path.replace("taboola","customer"));
                ghfs.rename(origPath,newPath);
            }
        });

我得到一个错误,hadoop.fs.filesystem是不可序列化的(因此可能不能用于并行操作)
你知道我该怎么解决这个问题还是换个方法?

oxf4rvwz

oxf4rvwz1#

问题是您正在尝试序列化ghfs对象。如果您使用mappartitions并在每个分区中重新创建ghfs对象,您将能够运行您的代码,只需做一些小的更改。

rkue9o1l

rkue9o1l2#

你需要做什么 FileSystem.get 内部 VoidFunction 我也是。
驱动程序需要一个文件系统来获取文件列表,但每个工作程序也需要一个文件系统来进行重命名。驱动程序无法将其文件系统传递给工作程序,因为它不可序列化。但是工人们可以得到他们自己的文件系统。
在scalaapi中,您可以使用 RDD.foreachPartition 以一种你只需要 FileSystem.get 每个分区一次,而不是每行一次。它可能也可以在javaapi中使用。

0s0u357o

0s0u357o3#

当我的hdfs归档目录达到最大项目限制时,我也遇到了类似的问题

Request error: org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException
The directory item limit of /my/archive is exceeded: limit=1048576 items=1048576

我决定将上一年(2015年)的所有项目移到单独的子文件夹中。这是纯壳溶液

export HADOOP_CLIENT_OPTS="-XX:-UseGCOverheadLimit -Xmx4096m"
hdfs dfs -ls /my/archive \
    | grep 2015- \
    | awk '{print $8}' \
    | gnu-parallel -X -s 131000 hdfs dfs -mv {} /my/archive/2015

评论:
需要客户端选项覆盖 hdfs dfs -ls 因为文件太多了。请参阅此处了解更多详细信息。 hdfs dfs 客户端对参数列表长度有限制:关于 131000 ( 2^17 )查尔斯。
移动420k个文件花了几分钟时间。

a7qyws3x

a7qyws3x4#

我建议重命名它们,就像在非map reduce上下文中重命名文件系统类一样(就在驱动程序中),重命名100k文件没什么大不了的,它太慢了,然后可以尝试多线程处理。做点像这样的事

FileSystem fileSystem = new Path("").getFileSystem(new Configuration());
File [] files =  FileUtil.listFiles(directory)
for (File file : files) {
    fileSystem.rename(new Path(file.getAbsolutePath()),new Path("renamed"));
}

顺便说一句,您在spark中遇到的错误是因为spark需要它用来实现可序列化的对象,而文件系统不需要。
我无法确认这一点,但hdfs中的每个重命名似乎都会涉及namenode,因为它跟踪文件的完整目录结构和节点位置(确认链接),这意味着无法高效地并行完成。根据这个答案,重命名是一个仅元数据的操作,所以它应该是非常快速的串行运行。

相关问题