在Scala和Spark中创建RDD

ie3xauqp  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(243)
def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("SparkAndHive")
      .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
      .enableHiveSupport()
      .getOrCreate()

    GeoSparkSQLRegistrator.registerAll(spark.sqlContext)

    val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")

    def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = { 
      for (filename <- files) { // If a sub directory is found,

        if (filename.isDirectory) if (filename.getName.contains("fire")) {
          rds.add(filename.getAbsolutePath)
          println(filename.getAbsolutePath)
        }
        else if (filename.getName.contains("water")){
          rdd.add(filename.getAbsolutePath)
          println(filename.getAbsolutePath)
        }
        else {                     
          displayFiles(filename.listFiles, a, b)
        }
      }
    }

    val files = new File("C://folder").listFiles

    val list1 = new util.ArrayList[String]
    val list2 = new util.ArrayList[String]

    displayFiles(files, list1, list2)

    val a= Seq(list1)
    println(a)
    val b= Seq(list2)
    println(b)

    val rdd1 = spark.sparkContext.parallelize(Seq(a))
    rdd1.foreach(println))

    val rdd2 = spark.sparkContext.parallelize(Seq(b))
    rdd2.foreach(println))

我打印了以_fire_water结尾的子目录路径列表。然后我创建了一个列表,将以_fire结尾的路径存储在一个列表中,将以_water结尾的路径存储在另一个列表中。我已经使用foreach循环为两个列表中存储的所有目录创建了RDD。当我为foreach循环声明一个变量并打印它时,它显示了一个空列表

问题:如何将所有RDD合并为一个RDD,即一个用于_fire,另一个用于_water

jyztefdp

jyztefdp1#

您可以更直接地创建它们。代码中的问题是displayFiles实际上没有返回任何内容,也没有修改list1list2。因此,这些列表将为空,ab也将为空。
取而代之的是,您可以尝试如下操作:

val sc = spark.sparkContext
val basePath = "C://folder/"
val rddWater = sc.textFile(basePath + "*_water")
val rddFire = sc.textFile(basePath + "*_fire")

上面的命令将获取与全局/路径匹配的所有文件的所有内容。或者,如果您还需要找出与每个记录对应的文件路径,则可以使用sc.wholeTextFiles

val rddWater = sc.wholeTextFiles("*_water")
val rddFire = sc.wholeTextFiles("*_fire")
// inspect contents using rddWater.collect() or rddFire.collect()

该站点有更多示例:https://sparkbyexamples.com/apache-spark-rdd/spark-read-multiple-text-files-into-a-single-rdd/

相关问题