斯卡拉;递归遍历父hadoop目录中的所有目录

8e2ybdfx  于 2021-05-27  发布在  Hadoop
关注(0)|答案(2)|浏览(418)

我一直在尝试创建一个递归函数来遍历hadoop父路径中的所有目录。我有下面的函数,但输出是一堆嵌套的对象数组,所以不完全是我要找的,但它确实走hadoop的道路。如有任何建议,我们将不胜感激。我的目标是让返回类型为array[path]。
获取给定父目录的底层分区路径示例:parent /hadoop/parent/path 带隔板 month , day 因此,在本例中,我们期望一个具有365条路径的数组。

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
val parentPath = "/hadoop/parent/path"
val hdfsPath: Path = new Path(parentPath)

def recursiveWalk(hdfsPath: Path): Array[Object] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    val fileIterable = fs.listStatus(hdfsPath)
    val res = for (f <- fileIterable) yield {
        if (f.isDirectory) {
            recursiveWalk(f.getPath).distinct
        }
        else {
            hdfsPath
        }
    }
    res.distinct
}
bbmckpt7

bbmckpt71#

您定义了一个递归函数,该函数生成一个数组(for循环):
如果项是一个目录,则函数的输出,目录是一个对象数组。
Path 如果它是一个简单的文件。
这解释了获得嵌套数组(数组数组)的事实。
你可以用 flatMap 为了避免这个问题。它将对象列表转换(或“展平”)为对象列表。另外,要获得期望的类型,需要在停止条件和递归之间有匹配的类型( ArrayPath ). 所以你需要包起来 hdfsPath 在数组中。
以下是如何根据我刚才写的内容快速解决您的问题:

def recursiveWalk(hdfsPath: Path): Array[Path] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    val fileIterable = fs.listStatus(hdfsPath)
    val res = fileIterable.flatMap(f => {
        if (f.isDirectory) {
            recursiveWalk(f.getPath).distinct
        }
        else {
            Array(hdfsPath)
        }
    })
    res.distinct
}

上面的代码解决了这个问题,但是为了避免使用distinct,您可以将条件放在输入文件上,而不是像下面这样的子文件夹上。您还可以在函数之外一次性地定义文件系统。

val conf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem(conf)

def recursiveWalk(path : Path): Array[Path] = {
    if(hdfs.isDirectory(path))
        hdfs.listStatus(path).map(_.getPath).flatMap(rec _) :+ path
    else Array()
}
xxhby3vn

xxhby3vn2#

试着用这个:

def recursiveWalk(hdfsPath: Path): Array[Path] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    if (fs.isDirectory(hdfsPath)) {
      fs.listStatus(hdfsPath).flatMap(innerPath => recursiveWalk(innerPath.getPath))
    } else Array.empty[Path]
  }

或者如果您需要目录中的文件,也可以使用:

def getDirsWithFiles(hdfsPath: Path): Array[Path] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    if (fs.isDirectory(hdfsPath)) {
      fs.listStatus(hdfsPath).flatMap(innerPath => getDirsWithFiles(innerPath.getPath))
    } else Array(hdfsPath)
  }

相关问题