使用pyspark解析spark3Dataframe中的多行嵌套json

pxiryf3j  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(441)

我在pyspark中读取多行json时遇到问题。例子:

{
    "_index": "kl.service-log.2021.04.06",
    "_type": "_doc",
    "_id": "hZ3SpHgBhp2ht1Q8n8ym",
    "_version": 1,
    "_score": null,
    "_source": {
        "publishTime": "2021-04-06T01:36:09.422Z",
        "client_ips": "2601:247:c580:3337:45c0:dd63:35e0:9247",
        "body": {
            "events": "[{\"key\":\"Key  Launched\",\"count\":1,\"timestamp\":1617672914673,\"sum\":0},{\"key\":\"Viewed Screen\",\"count\":1,\"timestamp\":1617672969301,\"sum\":0}]",
            "sdk_name": "java-native-android",
            "tz": "-300"
        }
    }
}

架构定义如下:

root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: string (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- body: struct (nullable = true)
 |    |    |-- events: string (nullable = true)
 |    |    |-- sdk_name: string (nullable = true)
 |    |    |-- tz: string (nullable = true)
 |    |-- client_ips: string (nullable = true)
 |    |-- publishTime: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- _version: long (nullable = true)

低于 _source.body.events ,我看到数据类型是string,但它是一个包含两个不同记录的dictorial。我想有两个特定的列不同的行他们。

8xiog9wr

8xiog9wr1#

您可以使用 from_json ,并重建\u源列:

import pyspark.sql.functions as F

df2 = df.withColumn(
    '_source', 
    F.struct(
        F.struct(
            F.from_json(
                '_source.body.events',
                'array<struct<key:string, count:int, timestamp:long, sum:int>>'
            ).alias('events'), 
            '_source.body.sdk_name', 
            '_source.body.tz'
        ).alias('body'), 
        '_source.client_ips', 
        '_source.publishTime'
    )
)

df2.show(truncate=False)
+--------------------+-------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+--------+
|_id                 |_index                   |_score|_source                                                                                                                                                                      |_type|_version|
+--------------------+-------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+--------+
|hZ3SpHgBhp2ht1Q8n8ym|kl.service-log.2021.04.06|null  |[[[[Key  Launched, 1, 1617672914673, 0], [Viewed Screen, 1, 1617672969301, 0]], java-native-android, -300], 2601:247:c580:3337:45c0:dd63:35e0:9247, 2021-04-06T01:36:09.422Z]|_doc |1       |
+--------------------+-------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+--------+

df2.printSchema()
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: string (nullable = true)
 |-- _source: struct (nullable = false)
 |    |-- body: struct (nullable = false)
 |    |    |-- events: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- count: integer (nullable = true)
 |    |    |    |    |-- timestamp: long (nullable = true)
 |    |    |    |    |-- sum: integer (nullable = true)
 |    |    |-- sdk_name: string (nullable = true)
 |    |    |-- tz: string (nullable = true)
 |    |-- client_ips: string (nullable = true)
 |    |-- publishTime: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- _version: long (nullable = true)

如果要将数组分解为单独的行,可以对 df2 以上获得:

df3 = df2.withColumn(
    'idx', 
    F.expr('explode(sequence(0, size(_source.body.events) - 1))')
).withColumn(
    '_source', 
    F.struct(
        F.struct(
            F.expr('_source.body.events[idx]'),
            '_source.body.sdk_name', 
            '_source.body.tz'
        ).alias('body'), 
        '_source.client_ips', 
        '_source.publishTime'
    )
).drop('idx')

df3.show(truncate=False)
+--------------------+-------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------+-----+--------+
|_id                 |_index                   |_score|_source                                                                                                                              |_type|_version|
+--------------------+-------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------+-----+--------+
|hZ3SpHgBhp2ht1Q8n8ym|kl.service-log.2021.04.06|null  |[[[Key  Launched, 1, 1617672914673, 0], java-native-android, -300], 2601:247:c580:3337:45c0:dd63:35e0:9247, 2021-04-06T01:36:09.422Z]|_doc |1       |
|hZ3SpHgBhp2ht1Q8n8ym|kl.service-log.2021.04.06|null  |[[[Viewed Screen, 1, 1617672969301, 0], java-native-android, -300], 2601:247:c580:3337:45c0:dd63:35e0:9247, 2021-04-06T01:36:09.422Z]|_doc |1       |
+--------------------+-------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------+-----+--------+

相关问题