按json键分割sparkDataframe行以创建新的Dataframe输出

z0qdvdin  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(415)

使用sparkDataframe。

scala> val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input: org.apache.spark.sql.DataFrame = [p_id: string, p_meta: string]

scala> df_input.show()
+----+----------------+
|p_id|          p_meta|
+----+----------------+
|  p1|{"a": 1, "b": 2}|
|  p2|        {"c": 3}|
+----+----------------+

给定这个输入df,是否可以通过json键将其拆分以创建一个新的df\u输出,如下面的输出?
数据框输出=

p_id    p_meta_key      p_meta_value
 p1         a                1
 p1         b                2
 p2         c                3

我使用的是spark版本3.0.0/scala 2.12.x。我更喜欢用 spark.sql.functions._

hec6srdp

hec6srdp1#

另一种选择- from_json + explode ```
val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") )
.toDF("p_id", "p_meta")
df_input.show(false)
/**
* +----+----------------+
* |p_id|p_meta |
* +----+----------------+
* |p1 |{"a": 1, "b": 2}|
* |p2 |{"c": 3} |
* +----+----------------+
*/

df_input.withColumn("p_meta", from_json($"p_meta", "map<string, string>", Map.empty[String, String]))
  .selectExpr("p_id", "explode(p_meta) as (p_meta_key, p_meta_value)")
  .show(false)
/**
  * +----+----------+------------+
  * |p_id|p_meta_key|p_meta_value|
  * +----+----------+------------+
  * |p1  |a         |1           |
  * |p1  |b         |2           |
  * |p2  |c         |3           |
  * +----+----------+------------+
  */
rqqzpn5f

rqqzpn5f2#

下面的代码将解决您的问题,我已经在spark 3.0.0/scala 2.12.10中对此进行了测试。

import org.apache.spark.sql.functions._

val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input.show()

/*
+----+----------------+
|p_id|          p_meta|
+----+----------------+
|  p1|{"a": 1, "b": 2}|
|  p2|        {"c": 3}|
+----+----------------+

* /

//UDF to convert JSON to MAP
def convert(str:String):Map[String,String]={
     "(\\w+): (\\w+)".r.findAllIn(str).matchData.map(i => {
     (i.group(1), i.group(2))
     }).toMap
     }
val udfConvert=spark.udf.register("udfConvert",convert _)

//Remove double quotes 
val df=df_input.withColumn("p_meta", regexp_replace($"p_meta", "\"", ""))
df.show()

/*
+----+------------+
|p_id|      p_meta|
+----+------------+
|  p1|{a: 1, b: 2}|
|  p2|      {c: 3}|
+----+------------+

* /

val df1=df.withColumn("new_col",udfConvert($"p_meta"))

/*
+----+------------+----------------+
|p_id|      p_meta|         new_col|
+----+------------+----------------+
|  p1|{a: 1, b: 2}|[a -> 1, b -> 2]|
|  p2|      {c: 3}|        [c -> 3]|
+----+------------+----------------+

* /

df1.select($"p_id",$"p_meta",$"new_col",explode($"new_col")).drop($"p_meta").drop($"new_col").withColumn("p_meta_key",$"key").withColumn("p_mata_value",$"value").drop($"key").drop($"value").show()
/*
+----+----------+------------+
|p_id|p_meta_key|p_mata_value|
+----+----------+------------+
|  p1|         a|           1|
|  p1|         b|           2|
|  p2|         c|           3|
+----+----------+------------+

* /

相关问题