我在一个集群里工作。我需要对hdfs中包含的每个文本文件运行相同的spark操作。但是我不想从shell命令行为每个文件提交spark job shell命令,因为文件数是90。我该怎么做?
我的一个文件的代码结构如下:
object SparkGraphGen{
def main(args: Array[String]){
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("dataset")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val peopleRDD = sc.textFile("file1.csv")
...
do stuff
...
sc.stop()
}}
1条答案
按热度按时间hgb9j2n61#
更新:
怎么样
foreach
回路:输出:
您也可以在shell脚本中编写相同的for循环
或一次性处理所有文件。。。。
您可以将所有文件放在一个目录中,并且只将完整的目录路径传递给spark上下文,spark将处理该目录中的所有文件:
您还可以组合RDD,如:
但是对于90个文件,我会把所有文件放在一个目录中,并使用目录路径在一个作业中处理所有文件。。。