读取pyspark struct json列非必需元素

des4xlb0  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(463)

我有一个parquet文件,其中一列是存储json的struct字段。结构如下图所示。

  1. originator: struct (nullable = true)
  2. |-- originatorDetail: struct (nullable = true)
  3. | |-- applicationDeployedId: string (nullable = true)
  4. | |-- applicationDeployedNameVersion: string (nullable = true)
  5. | |-- applicationNameVersion: string (nullable = true)
  6. | |-- cloudHost: string (nullable = true)
  7. | |-- cloudRegion: string (nullable = true)
  8. | |-- cloudStack: string (nullable = true)
  9. | |-- version: string (nullable = true)
  10. |-- Orversion: string (nullable = true)

json中只有version字段是必需的,其他字段是非必需的。所以有些记录可能只有2个元素,仍然有效。
假设我想读取cloudhost字段。我可以读作originator.originatordetail.cloudhost。但对于不存在此非必需字段的记录。它将失败,因为元素不在那里。有没有什么方法可以在不使用自定义项的情况下,将这些不需要的值读取为空的记录值。
一些例子

  1. originator": {
  2. "originatorDetail": {
  3. "applicationDeployedId": "PSLV",
  4. "cloudRegion": "Mangal",
  5. "cloudHost": "Petrol",
  6. "applicationNameVersion": "CRDI",
  7. "applicationDeployedNameVersion": "Tuna",
  8. "cloudStack": "DEV",
  9. "version": "1.1.0"
  10. },
  11. Orversion": "version.1"
  12. }
  13. -------------
  14. originator": {
  15. "originatorDetail": {
  16. "version": "1.1.0"
  17. },
  18. Orversion": "version.1"
  19. }

所需输出

  1. applicationDeployedId applicationDeployedNameVersion applicationNameVersion cloudHost cloudRegion cloudStack version Orversion
  2. PSLV Tuna CRDI Petrol Mangal DEV 1.1.0 version.1
  3. 1.1.0 version.1
bzzcjhmw

bzzcjhmw1#

使用 from_json 来自spark-2.4的函数+
读取Parquet数据,然后使用 from_json 通过传递与json列匹配的模式。
spark将读取匹配的数据并添加具有空值的非匹配字段。 Example: ```
df.show(10,False)

+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

|id |json_data #|

+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

|1 |{"originator": {"originatorDetail": {"applicationDeployedId": "PSLV","cloudRegion": "Mangal","cloudHost": "Petrol","applicationNameVersion": "CRDI","applicationDeployedNameVersion": "Tuna","cloudStack": "DEV","version": "1.1.0"},"Orversion": "version.1"}}|

|2 |{"originator": { "originatorDetail": { "version": "1.1.0" }, "Orversion": "version.1"}} |

+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

schema=StructType([StructField("originator",StructType([StructField("Orversion",StringType(),True),
StructField("originatorDetail",StructType([StructField("applicationDeployedId",StringType(),True),
StructField("applicationDeployedNameVersion",StringType(),True),
StructField("applicationNameVersion",StringType(),True),
StructField("cloudHost",StringType(),True),
StructField("cloudRegion",StringType(),True),
StructField("cloudStack",StringType(),True),
StructField("version",StringType(),True)]),True)]),True)])

from pyspark.sql.functions import *
from pyspark.sql.types import *

then read the json_data column using from_json function

df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").show(10,False)

+---+--------------------------------------------------------+

|id |json_converted |

+---+--------------------------------------------------------+

|1 |version.1, [PSLV, Tuna,, Petrol, Mangal, DEV, 1.1.0]|

|2 |version.1, [,,,,,, 1.1.0] |

+---+--------------------------------------------------------+

df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").printSchema()

root

|-- id: long (nullable = true)

|-- json_converted: struct (nullable = true)

| |-- originator: struct (nullable = true)

| | |-- Orversion: string (nullable = true)

| | |-- originatorDetail: struct (nullable = true)

| | | |-- applicationDeployedId: string (nullable = true)

| | | |-- applicationDeployedNameVersion: string (nullable = true)

| | | |-- applicationNameVersi: string (nullable = true)

| | | |-- cloudHost: string (nullable = true)

| | | |-- cloudRegion: string (nullable = true)

| | | |-- cloudStack: string (nullable = true)

| | | |-- version: string (nullable = true)

even though we don't have all fields from id=2 still we added fields

df.withColumn("json_converted",from_json(col("json_data"),schema)).select("json_converted.originator.originatorDetail.applicationDeployedId").show(10,False)

+---------------------+

|applicationDeployedId|

+---------------------+

|PSLV |

|null |

+---------------------+

展开查看全部

相关问题