显然dbutils不能在cmd行spark submits中使用,您必须使用jar作业,但是由于其他要求,我必须使用spark submit样式的作业,但是仍然需要列出dbfs中的文件键并对其进行迭代,以决定将哪些文件用作进程的输入。。。
使用scala,我可以使用spark或hadoop中的哪个lib来检索 dbfs:/filekeys
一种特殊的模式?
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
println(s"FileUtils.ls path: $inputDir")
val path = new Path(inputDir)
val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val fileStatuses = fs.listStatus(path)
fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}
使用上面的方法,如果我传入一个部分键前缀 dbfs:/mnt/path/to/folder
当所述“文件夹”中存在以下键时:
/mnt/path/to/folder/file1.csv /mnt/path/to/folder/file2.csv
我明白了 dbfs:/mnt/path/to/folder is not a directory
当它击中 val path = new Path(inputDir)
1条答案
按热度按时间xzlaal3s1#
需要使用sparksession来完成。
我们是这样做的: