def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("SparkAndHive")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
.enableHiveSupport()
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(spark.sqlContext)
val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")
def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = {
for (filename <- files) { // If a sub directory is found,
if (filename.isDirectory) if (filename.getName.contains("fire")) {
rds.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else if (filename.getName.contains("water")){
rdd.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else {
displayFiles(filename.listFiles, a, b)
}
}
}
val files = new File("C://folder").listFiles
val list1 = new util.ArrayList[String]
val list2 = new util.ArrayList[String]
displayFiles(files, list1, list2)
val a= Seq(list1)
println(a)
val b= Seq(list2)
println(b)
val rdd1 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(println))
val rdd2 = spark.sparkContext.parallelize(Seq(b))
rdd2.foreach(println))
我打印了以_fire
和_water
结尾的子目录路径列表。然后我创建了一个列表,将以_fire
结尾的路径存储在一个列表中,将以_water
结尾的路径存储在另一个列表中。我已经使用foreach
循环为两个列表中存储的所有目录创建了RDD。当我为foreach
循环声明一个变量并打印它时,它显示了一个空列表
问题:如何将所有RDD合并为一个RDD,即一个用于_fire
,另一个用于_water
?
1条答案
按热度按时间jyztefdp1#
您可以更直接地创建它们。代码中的问题是
displayFiles
实际上没有返回任何内容,也没有修改list1
或list2
。因此,这些列表将为空,a
和b
也将为空。取而代之的是,您可以尝试如下操作:
上面的命令将获取与全局/路径匹配的所有文件的所有内容。或者,如果您还需要找出与每个记录对应的文件路径,则可以使用
sc.wholeTextFiles
。该站点有更多示例:https://sparkbyexamples.com/apache-spark-rdd/spark-read-multiple-text-files-into-a-single-rdd/