如何为map、struct、array等复杂数据类型编写sparkscala模式

3wabscal  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(524)

请查找数据格式

c3f36c25-2546-48b2-bd72-1b5e5dcae2ab/1620247529/{6032:{"advertisers":{"Amoma":[{"eurocents":17256,"breakfast":false}]

试过下面的

valsschema=List(
                ("Userid", StringType, true),
                ("unix_time", IntegerType, true),
("hotelresults",MapType(IntegerType,true,StructType(("advertisers",MapType(StringType,true,ArrayType(StructType("eurocents",IntegerType,true,"breakfast",BooleanType,true))))))
              )
ycl3bljg

ycl3bljg1#

使用add方法生成structtype可能更容易。
似乎您的整个文件是一个csv格式,带有“/”(斜杠)分隔符:

// create initial DataFrame with JSON data inside of 3rd column
val df:DataFrame = spark.read.format("csv").option("delimiter", "/").option("header", "false").option("inferSchema", "false").load("62132524.txt").toDF("UserId", "UnixTime", "HotelResults")

您的第3列json无效,因此在此更改:

c3f36c25-2546-48b2-bd72-1b5e5dcae2ab/1620247529/{"advertisers":{"Amoma":[{"eurocents":17256,"breakfast":false}]}}

使用 from_json 方法(使用通配符导入以确保导入所有内容):

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// prepare StructType with inner StructType
val detailType:StructType = new StructType().add("eurocents", IntegerType).add("breakfast", BooleanType)
val jsonSchema:StructType = new StructType().add("advertisers", MapType(StringType, ArrayType(detailType, false)))

// Now, the JSON Schema can be used:
df.select(from_json(col("HotelResults"), jsonSchema) as "HotelResults2").select(col("HotelResults2.*")).show

基于注解编辑(直接解析平面文件)
内置的csv解析器只能解析带有一个分隔符的文件。解决方法是使用split scala string方法。

// Read data into a flat RDD
val rawDF = spark.sparkContext.textFile("rawdata.dat")

// Create case class (I added 3 fields, but you will need to add the rest)
case class Record(id: Int, advertiser: String, euroCents: Int)

// Create function converting an array of values to the case class
def arrToRecord(a:Array[String]): Record = Record(a(0).toInt, a(1), a(2).toInt)

// Use one of the approaches below - they should be equivalent in your case
val mappedDF:DataFrame = rawDF.map((s:String) => arrToRecord(s.split("[\u0006\u0003\u0005\u0007\u0008\u0002]"))).toDF()
// OR
val mappedDF:DataFrame = rawDF.map((s:String) => arrToRecord(s.split(Array('\u0006','\u0003','\u0005','\u0007','\u0008','\u0002')))).toDF()

结果

mappedArr.show
+----+----------+---------+
|  id|advertiser|euroCents|
+----+----------+---------+
|6032|     Amoma|    17256|
+----+----------+---------+

相关问题