如何使用循环在sparkscala中迭代hdfs中的多个文本文件?

z4iuyo4d  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(409)

我在一个集群里工作。我需要对hdfs中包含的每个文本文件运行相同的spark操作。但是我不想从shell命令行为每个文件提交spark job shell命令,因为文件数是90。我该怎么做?
我的一个文件的代码结构如下:

object SparkGraphGen{
def main(args: Array[String]){
      val conf = new SparkConf()
                .setMaster("yarn")
                .setAppName("dataset")
      val sc = new SparkContext(conf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      val peopleRDD = sc.textFile("file1.csv")
      ...
      do stuff
      ...
      sc.stop()
      }}
hgb9j2n6

hgb9j2n61#

更新:
怎么样 foreach 回路:

val sc = new SparkContext(conf)
//val files = new File("Data\\files\\").listFiles.map(_.getAbsolutePath).toList 
val files = new File("Data\\files\\").listFiles.map(_.getName).toList           
files.foreach { file =>  
    //val lines = sc.textFile(file)
    val lines = sc.textFile("Data\\files\\" + file)
    println("total lines in file " + file + "  " + lines.count())   
    //do more stuf... for each file
    lines.saveAsTextFile("Data\\output\\" + file + "_output")
        }   
sc.stop()

输出:

total lines in file C:\Users\rpatel\workspaces\Spark\Data\files\file1.txt  4
total lines in file C:\Users\rpatel\workspaces\Spark\Data\files\file2.txt  4

您也可以在shell脚本中编写相同的for循环


# !/bin/bash

for file in $(hadoop fs -ls /hdfs/path/to/files/|awk -F '|' '{print $NF}')
do
  #run spark for each file
  spark-submit <options> $file /path/output/$file
done

或一次性处理所有文件。。。。
您可以将所有文件放在一个目录中,并且只将完整的目录路径传递给spark上下文,spark将处理该目录中的所有文件:

val peopleRDD = sc.textFile("/path/to/csv_files/")

您还可以组合RDD,如:

val file1RDD = sc.textFile("file1.csv") 
    val file2RDD = sc.textFile("file2.csv")
    val allFileRDD = file1RDD ++ file2RDD // ++ nRDD

但是对于90个文件,我会把所有文件放在一个目录中,并使用目录路径在一个作业中处理所有文件。。。

相关问题