def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("SparkAndHive")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
.enableHiveSupport()
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(spark.sqlContext)
val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")
def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = {
for (filename <- files) { // If a sub directory is found,
if (filename.isDirectory) if (filename.getName.contains("fire")) {
rds.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else if (filename.getName.contains("water")){
rdd.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else {
displayFiles(filename.listFiles, a, b)
}
}
}
val files = new File("C://folder").listFiles
val list1 = new util.ArrayList[String]
val list2 = new util.ArrayList[String]
displayFiles(files, list1, list2)
val a= Seq(list1)
println(a)
val b= Seq(list2)
println(b)
val rdd1 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(rrd)
val rdd2 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(rrd2)
val dfSeq1 = Seq(rdd1)
println(dfSeq1)
val mergeSeqDf1 = dfSeq1.reduce(_ union _)
mergeSeqDf1.show()
val dfSeq2 = Seq(rdd2)
println(dfSeq2)
val mergeSeqDf2 = dfSeq2.reduce(_ union _)
mergeSeqDf2.show()
我已经创建了一个包含子文件夹路径的列表,其中包含“Fire”List Look List(“C//1_Fire”、“C//2_Fire”、“C//3_Fire”)
并创建了其他列表,该列表包含子文件夹路径,其中包含类似List(“C//1_water”,“C//2_water”,“C//3_water”)
我已经为列表创建了RDD并打印出来,然后它显示了火灾的列表(“C//1_fire”、“C//2_fire”、“C//3_fire”)和水的列表(“C//1_water”、“C//2_water”、“C//3_water”)。
然后,我合并了rdd1中的所有火灾rdd和rdd2中的所有water rdd,但是我收到了Show的错误消息,因为“Value Show不是org.apache.spark.rdd.RDD[java.util.ArrayList[String]]mergeSeqDf1.show()的成员”
如何将RDD转换为 Dataframe 以显示 Dataframe
Dataframe 结构
>
>>person1
>>>a_fire
>>>>a_fire
>>>>>1_fire
>>>>>2_fire
>>>>>3_fire
>>>>>4_fire
>>>>a_water
>>>>>1_water
>>>>>2_water
>>>>>3_fire
>>>>>4_fire
>>person2
>>>b_fire
>>>>b_fire
>>>>>1_fire
>>>>>2_fire
>>>>>3_fire
>>>>>4_fire
>>>>b_water
>>>>>1_water
>>>>>2_water
>>>>>3_fire
>>>>>4_fire
1条答案
按热度按时间8yoxcaq71#
Spark有三个主要概念--
RDD
、DataSet
和DataFrame
。假设您有一个简单的元组列表
RDD
API是最容易获得的,并且可以通过SparkContext
获得。您只需要将spark-core
作为项目中的依赖项。对于
DataSet
和DataFrame
,您还需要在项目中添加spark-sql
作为依赖项。而SparkContext
是不够的,您需要一个SparkSession
。现在,
DataFrame
实际上只是DataSet[Row]
的另一个名称,其中Row
是另一个包含列的Spark数据结构。但是,您也可以提供列名
您还可以使用特定于域的数据结构,而不是使用
DataFrame
(即DataSet[Row]
)。