pyspark 为 嵌套 Json 创建 Spark 结构 化 流 模式

xlpyo6sf  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(180)

我 想 为 我 的 结构 化 流 作业 定义 模式 ( 在 python 中 ) , 但 我 无法 以 我 想要 的 方式 获得 数据 帧 模式 。
对于 本 json

{
    "messages": [{
        "IdentityNumber": 1,
        "body": {
            "Alert": "This is the payload"
        },
        "regionNumber": 11000002
    }]
}

中 的 每 一 个
我 使用 下面 的 代码 作为 模式

schema1 = StructType([StructField("messages", ArrayType(   
    StructType( 
        [
            StructField("body", StructType( [StructField("Alert", StringType())]) )
        ]
    )
    ,True))])

格式
但 我 得到 的 架构 是
df - 〉 消息 - 〉 正文 - 〉 警报
当 我 想要 这样 的 东西 时
df - 〉 警报
例如 , 一 个 数据 帧 , 它 有 一 个 名 为 alert 的 列 , 其中 包含 所有 以 alert 形式 出现 的 字符 串 消息 。 我 应该 在 我 定义 的 模式 中 做 什么 更改 ?

w6mmgewl

w6mmgewl1#

如果您正在读取与此架构相关的数据,则此架构是正确的。
如果你需要在读取上述schema中的json后提取嵌套字段,只需要使用点标记即可,例如:

df.select(col("messages[0].body.alert"))

如果你需要操作和分解所有的数组元素,请查看这篇文章,它解释了你必须做的不同选择:https://docs.databricks.com/_static/notebooks/transform-complex-data-types-scala.html
上面的答案和本文一样是在scala中,但是大多数spark sql API都可以很容易地移植到pySpark。

相关问题