按json键分割sparkDataframe行以创建新的Dataframe输出

z0qdvdin  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(421)

使用sparkDataframe。

  1. scala> val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
  2. df_input: org.apache.spark.sql.DataFrame = [p_id: string, p_meta: string]
  3. scala> df_input.show()
  4. +----+----------------+
  5. |p_id| p_meta|
  6. +----+----------------+
  7. | p1|{"a": 1, "b": 2}|
  8. | p2| {"c": 3}|
  9. +----+----------------+

给定这个输入df,是否可以通过json键将其拆分以创建一个新的df\u输出,如下面的输出?
数据框输出=

  1. p_id p_meta_key p_meta_value
  2. p1 a 1
  3. p1 b 2
  4. p2 c 3

我使用的是spark版本3.0.0/scala 2.12.x。我更喜欢用 spark.sql.functions._

hec6srdp

hec6srdp1#

另一种选择- from_json + explode ```
val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") )
.toDF("p_id", "p_meta")
df_input.show(false)
/**
* +----+----------------+
* |p_id|p_meta |
* +----+----------------+
* |p1 |{"a": 1, "b": 2}|
* |p2 |{"c": 3} |
* +----+----------------+
*/

  1. df_input.withColumn("p_meta", from_json($"p_meta", "map<string, string>", Map.empty[String, String]))
  2. .selectExpr("p_id", "explode(p_meta) as (p_meta_key, p_meta_value)")
  3. .show(false)
  4. /**
  5. * +----+----------+------------+
  6. * |p_id|p_meta_key|p_meta_value|
  7. * +----+----------+------------+
  8. * |p1 |a |1 |
  9. * |p1 |b |2 |
  10. * |p2 |c |3 |
  11. * +----+----------+------------+
  12. */
展开查看全部
rqqzpn5f

rqqzpn5f2#

下面的代码将解决您的问题,我已经在spark 3.0.0/scala 2.12.10中对此进行了测试。

  1. import org.apache.spark.sql.functions._
  2. val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
  3. df_input.show()
  4. /*
  5. +----+----------------+
  6. |p_id| p_meta|
  7. +----+----------------+
  8. | p1|{"a": 1, "b": 2}|
  9. | p2| {"c": 3}|
  10. +----+----------------+
  11. * /
  12. //UDF to convert JSON to MAP
  13. def convert(str:String):Map[String,String]={
  14. "(\\w+): (\\w+)".r.findAllIn(str).matchData.map(i => {
  15. (i.group(1), i.group(2))
  16. }).toMap
  17. }
  18. val udfConvert=spark.udf.register("udfConvert",convert _)
  19. //Remove double quotes
  20. val df=df_input.withColumn("p_meta", regexp_replace($"p_meta", "\"", ""))
  21. df.show()
  22. /*
  23. +----+------------+
  24. |p_id| p_meta|
  25. +----+------------+
  26. | p1|{a: 1, b: 2}|
  27. | p2| {c: 3}|
  28. +----+------------+
  29. * /
  30. val df1=df.withColumn("new_col",udfConvert($"p_meta"))
  31. /*
  32. +----+------------+----------------+
  33. |p_id| p_meta| new_col|
  34. +----+------------+----------------+
  35. | p1|{a: 1, b: 2}|[a -> 1, b -> 2]|
  36. | p2| {c: 3}| [c -> 3]|
  37. +----+------------+----------------+
  38. * /
  39. df1.select($"p_id",$"p_meta",$"new_col",explode($"new_col")).drop($"p_meta").drop($"new_col").withColumn("p_meta_key",$"key").withColumn("p_mata_value",$"value").drop($"key").drop($"value").show()
  40. /*
  41. +----+----------+------------+
  42. |p_id|p_meta_key|p_mata_value|
  43. +----+----------+------------+
  44. | p1| a| 1|
  45. | p1| b| 2|
  46. | p2| c| 3|
  47. +----+----------+------------+
  48. * /
展开查看全部

相关问题