需要解析json文件

dtcbnfnu  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(424)
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

需要使用spark dataframe将具有上述模式的jsonfile解析为结构化格式。“键”列的列名在“值”列中有值。
示例数据文件:{'type':'logs','eid':'1','keys':['crt_ts','id','upd_ts','km','pivl','distance','speed'],'values':['12343.0000.012','aaga1567','1333.333.333','565656','10.5','121','64']}
预期产量:

eid crt_ts id  upd_ts km  pivl distance speed type
  1  12343.0000.012 AAGA1567 1333.333.333 565656 10.5 121 64 logs
n3schb8v

n3schb8v1#

请检查下面的代码,我用过 groupBy , pivot & agg :

scala> val js = Seq(""" {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'], 'values': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656', '10.5', '121', '64']]}""").toDS
js: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val jdf = spark.read.json(js)
jdf: org.apache.spark.sql.DataFrame = [eid: string, keys: array<string> ... 2 more fields]

scala> jdf.printSchema
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

scala> jdf.show(false)
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|eid|keys                                           |type|values                                                           |
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|1  |[crt_ts, id, upd_ts, km, pivl, distance, speed]|logs|[[12343.0000.012, AAGA1567, 1333.333.333, 565656, 10.5, 121, 64]]|
+---+-----------------------------------------------+----+-----------------------------------------------------------------+

scala> :paste
// Entering paste mode (ctrl-D to finish)

jdf.select($"eid",$"keys",explode($"values").as("values"),$"type")
.select($"eid",$"type",explode(arrays_zip($"keys",$"values")).as("azip"))
.select($"eid",$"azip.*",$"type")
.groupBy($"type",$"eid")
.pivot($"keys")
.agg(first("values"))
.show(false)

// Exiting paste mode, now interpreting.

+----+---+--------------+--------+--------+------+----+-----+------------+
|type|eid|crt_ts        |distance|id      |km    |pivl|speed|upd_ts      |
+----+---+--------------+--------+--------+------+----+-----+------------+
|logs|1  |12343.0000.012|121     |AAGA1567|565656|10.5|64   |1333.333.333|
+----+---+--------------+--------+--------+------+----+-----+------------+

相关问题