Spark和Scala中的RDD to DataFrame

rqcrx0a6  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(241)
def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("SparkAndHive")
      .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
      .enableHiveSupport()
      .getOrCreate()

    GeoSparkSQLRegistrator.registerAll(spark.sqlContext)

    val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")

    def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = { 
      for (filename <- files) { // If a sub directory is found,

        if (filename.isDirectory) if (filename.getName.contains("fire")) {
          rds.add(filename.getAbsolutePath)
          println(filename.getAbsolutePath)
        }
        else if (filename.getName.contains("water")){
          rdd.add(filename.getAbsolutePath)
          println(filename.getAbsolutePath)
        }
        else {                     
          displayFiles(filename.listFiles, a, b)
        }
      }
    }

    val files = new File("C://folder").listFiles

    val list1 = new util.ArrayList[String]
    val list2 = new util.ArrayList[String]

    displayFiles(files, list1, list2)

    val a= Seq(list1)
    println(a)
    val b= Seq(list2)
    println(b)

    val rdd1 = spark.sparkContext.parallelize(Seq(a))
    rdd1.foreach(rrd)
    val rdd2 = spark.sparkContext.parallelize(Seq(a))
    rdd1.foreach(rrd2)

        val dfSeq1 = Seq(rdd1)
        println(dfSeq1)
        val mergeSeqDf1 = dfSeq1.reduce(_ union _)
        mergeSeqDf1.show()

        val dfSeq2 = Seq(rdd2)
        println(dfSeq2)
        val mergeSeqDf2 = dfSeq2.reduce(_ union _)
        mergeSeqDf2.show()

我已经创建了一个包含子文件夹路径的列表,其中包含“Fire”List Look List(“C//1_Fire”、“C//2_Fire”、“C//3_Fire”)
并创建了其他列表,该列表包含子文件夹路径,其中包含类似List(“C//1_water”,“C//2_water”,“C//3_water”)
我已经为列表创建了RDD并打印出来,然后它显示了火灾的列表(“C//1_fire”、“C//2_fire”、“C//3_fire”)和水的列表(“C//1_water”、“C//2_water”、“C//3_water”)。
然后,我合并了rdd1中的所有火灾rdd和rdd2中的所有water rdd,但是我收到了Show的错误消息,因为“Value Show不是org.apache.spark.rdd.RDD[java.util.ArrayList[String]]mergeSeqDf1.show()的成员”
如何将RDD转换为 Dataframe 以显示 Dataframe
Dataframe 结构

>
  >>person1
    >>>a_fire
       >>>>a_fire
         >>>>>1_fire
         >>>>>2_fire
         >>>>>3_fire
         >>>>>4_fire
     >>>>a_water
         >>>>>1_water
         >>>>>2_water
         >>>>>3_fire
         >>>>>4_fire
  >>person2
    >>>b_fire
       >>>>b_fire
         >>>>>1_fire
         >>>>>2_fire
         >>>>>3_fire
         >>>>>4_fire
     >>>>b_water
         >>>>>1_water
         >>>>>2_water
         >>>>>3_fire
         >>>>>4_fire
8yoxcaq7

8yoxcaq71#

Spark有三个主要概念--RDDDataSetDataFrame
假设您有一个简单的元组列表

// list of tuple (String, String)
// these tupele are contain id and name of people

val list: List[(String, String)] =
  List(
    ("1", "abc"),
    ("2", "def")
  )

RDD API是最容易获得的,并且可以通过SparkContext获得。您只需要将spark-core作为项目中的依赖项。

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("appName").setMaster("local[*]")

// people generally use `sc` variable to refer to `SparkContext`
val sc = new SparkContext(conf)

val rdd: RDD[(String, String)] = sc.parallelize(list)

对于DataSetDataFrame,您还需要在项目中添加spark-sql作为依赖项。而SparkContext是不够的,您需要一个SparkSession

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

// people generally use `spark` variable to refer to `SparkSession`
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()

// you can get the SparkContext from SparkSession
val sc = spark.sparkContext

// then you import the implicits required for working with DataSet API
import spark.implicits._

// rdd of tuple (String, String)
val rdd: RDD[(String, String)] = sc.parallelize(list)

// you can get a DataSet of tuple (String, String)
val ds1: Dataset[(String, String)] = rdd.toDS()

ds1.show()
//+---+---+
//| _1| _2|
//+---+---+
//|  1|abc|
//|  2|def|
//+---+---+

现在,DataFrame实际上只是DataSet[Row]的另一个名称,其中Row是另一个包含列的Spark数据结构。

// convert to df without giving specific column names
// the Rows will use the tuple index as column names
val df1: DataFrame = rdd.toDF()

df1.show()
//+---+---+
//| _1| _2|
//+---+---+
//|  1|abc|
//|  2|def|
//+---+---+

// remember DataFrame is jut a name for DataSet[Row]
val df11: Dataset[Row] = rdd.toDF()

df11.show()
//+---+---+
//| _1| _2|
//+---+---+
//|  1|abc|
//|  2|def|
//+---+---+

但是,您也可以提供列名

val df2: DataFrame = rdd.toDF("id", "name")

df2.show()
//+---+----+
//| id|name|
//+---+----+
//|  1| abc|
//|  2| def|
//+---+----+

您还可以使用特定于域的数据结构,而不是使用DataFrame(即DataSet[Row])。

case class Person(id: String, name: String)

val ds2: Dataset[Person] = rdd.map(t => Person(t._1, t._2)).toDS()

ds2.show()
//+---+----+
//| id|name|
//+---+----+
//|  1| abc|
//|  2| def|
//+---+----+

相关问题