如何使用pyspark explode()分解结构

hgc7kmma  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(1266)

如何将下面的json转换为后面的关系行?我一直坚持的一点是 explode() 由于类型不匹配,函数引发异常。我还没有找到一种方法来强制将数据转换成合适的格式,以便可以从中的每个对象创建行 source 钥匙在 sample_json 对象。
json输入

  1. sample_json = """
  2. {
  3. "dc_id": "dc-101",
  4. "source": {
  5. "sensor-igauge": {
  6. "id": 10,
  7. "ip": "68.28.91.22",
  8. "description": "Sensor attached to the container ceilings",
  9. "temp":35,
  10. "c02_level": 1475,
  11. "geo": {"lat":38.00, "long":97.00}
  12. },
  13. "sensor-ipad": {
  14. "id": 13,
  15. "ip": "67.185.72.1",
  16. "description": "Sensor ipad attached to carbon cylinders",
  17. "temp": 34,
  18. "c02_level": 1370,
  19. "geo": {"lat":47.41, "long":-122.00}
  20. },
  21. "sensor-inest": {
  22. "id": 8,
  23. "ip": "208.109.163.218",
  24. "description": "Sensor attached to the factory ceilings",
  25. "temp": 40,
  26. "c02_level": 1346,
  27. "geo": {"lat":33.61, "long":-111.89}
  28. },
  29. "sensor-istick": {
  30. "id": 5,
  31. "ip": "204.116.105.67",
  32. "description": "Sensor embedded in exhaust pipes in the ceilings",
  33. "temp": 40,
  34. "c02_level": 1574,
  35. "geo": {"lat":35.93, "long":-85.46}
  36. }
  37. }
  38. }"""

期望输出

  1. dc_id source_name id description
  2. -------------------------------------------------------------------------------
  3. dc-101 sensor-gauge 10 Sensor attached to the container ceilings
  4. dc-101 sensor-ipad 13 Sensor ipad attached to carbon cylinders
  5. dc-101 sensor-inest 8 Sensor attached to the factory ceilings
  6. dc-101 sensor-istick 5 Sensor embedded in exhaust pipes in the ceilings

Pypark代码

  1. from pyspark.sql.functions import *
  2. df_sample_data = spark.read.json(sc.parallelize([sample_json]))
  3. df_expanded = df_sample_data.withColumn("one_source",explode_outer(col("source")))
  4. display(df_expanded)

错误
analysisexception:无法解析“explode”( source )'由于数据类型不匹配:函数explode的输入应该是数组或Map类型,而不是struct。。。。
我将这个databricks笔记本放在一起,以进一步演示挑战并清楚地显示错误。我将能够使用这个笔记本来测试这里提供的任何建议。

vwhgwdsa

vwhgwdsa1#

你不能使用 explode 但是你可以在结构中得到列名 source (与 df.select("source.*").columns )使用列表理解,您可以从每个嵌套结构中创建一个字段数组,然后分解以获得所需的结果:

  1. from pyspark.sql import functions as F
  2. df1 = df.select(
  3. "dc_id",
  4. F.explode(
  5. F.array(*[
  6. F.struct(
  7. F.lit(s).alias("source_name"),
  8. F.col(f"source.{s}.id").alias("id"),
  9. F.col(f"source.{s}.description").alias("description")
  10. )
  11. for s in df.select("source.*").columns
  12. ])
  13. ).alias("sources")
  14. ).select("dc_id", "sources.*")
  15. df1.show(truncate=False)
  16. # +------+-------------+---+------------------------------------------------+
  17. # |dc_id |source_name |id |description |
  18. # +------+-------------+---+------------------------------------------------+
  19. # |dc-101|sensor-igauge|10 |Sensor attached to the container ceilings |
  20. # |dc-101|sensor-inest |8 |Sensor attached to the factory ceilings |
  21. # |dc-101|sensor-ipad |13 |Sensor ipad attached to carbon cylinders |
  22. # |dc-101|sensor-istick|5 |Sensor embedded in exhaust pipes in the ceilings|
  23. # +------+-------------+---+------------------------------------------------+
展开查看全部

相关问题