分解pyspark Dataframe 中的JSON字符串

lf5gs5x2  于 2023-04-29  发布在  Spark
关注(0)|答案(2)|浏览(137)

我有一个JSON字符串substitutions作为dataframe中的一列,其中有多个数组元素,我想分解并为该数组中的每个元素创建一个新行。df中还有其他列
我的dataframe看起来像这样:

+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+
|           requestid|sourcepage|              cartid|                  tm|        dt|          customerId| usItemId|prefType|       substitutions|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+
|00-efbedfe05b4482...|  CHECKOUT|808b44cc-1a38-4dd...|2023-04-25 00:07:...|2023-04-25|f1a34e16-a6d0-6f5...|862776084| NO_PREF|{"id":{"productId...|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+

json字符串列substitutions

[
    {
        "id": {
            "productId": "2N3UYGUTROQK",
            "usItemId": "32667929"
        },
        "usItemId": "32667929",
        "itemRank": 1,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTRHQK",
            "usItemId": "32667429"
        },
        "usItemId": "32667429",
        "itemRank": 2,
        "customerChoice": true
    },
    {
        "id": {
            "productId": "2N3UYGUTRYQK",
            "usItemId": "32667529"
        },
        "usItemId": "32667529",
        "itemRank": 3,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTIQK",
            "usItemId": "32667329"
        },
        "usItemId": "32667329",
        "itemRank": 4,
        "customerChoice": false
    },
    {"id": {
        "productId": "2N3UYGUTYOQK",
        "usItemId": "32663929"
    },
    "usItemId": "32663929",
    "itemRank": 5,
    "customerChoice": false
    }
]

我已经尝试了下面的方法,但没有得到预期的结果

df.select("*", f.explode(f.from_json("substitutions", MapType(StringType(),StringType()))))

+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+-------+
|           requestid|sourcepage|              cartid|                  tm|        dt|          customerId| usItemId|prefType|       substitutions|entries|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+-------+
|00-efbedfe05b4482...|  CHECKOUT|808b44cc-1a38-4dd...|2023-04-25 00:07:...|2023-04-25|f1a34e16-a6d0-6f5...|862776084| NO_PREF|[{"id":{"productI...|   null|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+-------+

我犯了什么错?

qzwqbdag

qzwqbdag1#

你的问题是你没有正确地定义你的json的模式。
让我们从创建一个简化版本的df开始(您不需要这样做):

someField='someVal'
substitutions="""
[
    {
        "id": {
            "productId": "2N3UYGUTROQK",
            "usItemId": "32667929"
        },
        "usItemId": "32667929",
        "itemRank": 1,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTRHQK",
            "usItemId": "32667429"
        },
        "usItemId": "32667429",
        "itemRank": 2,
        "customerChoice": true
    },
    {
        "id": {
            "productId": "2N3UYGUTRYQK",
            "usItemId": "32667529"
        },
        "usItemId": "32667529",
        "itemRank": 3,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTIQK",
            "usItemId": "32667329"
        },
        "usItemId": "32667329",
        "itemRank": 4,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTYOQK",
            "usItemId": "32663929"
        },
        "usItemId": "32663929",
        "itemRank": 5,
        "customerChoice": false
    }
]"""

df = spark.createDataFrame([(someField, substitutions)], ["someField", "substitutions"])

现在,为了能够解析json(例如像您正在做的那样使用from_json函数),您需要定义要使用的正确模式。完成后,您可以explode您的dataframe:

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, explode, col

# Defining the json schema
schema = ArrayType(StructType([
    StructField("id", StructType([
        StructField("productId", StringType(), True),
        StructField("usItemId", StringType(), True),
    ]), True),
    StructField("usItemId", StringType(), True),
    StructField("itemRank", IntegerType(), True),
    StructField("customerChoice", BooleanType(), True)
]))

output = df.withColumn("jsonStruct", explode(from_json(col("substitutions"), schema))) \
           .select("someField", "jsonStruct.*")

>>> output.show(truncate=False)
+---------+------------------------+--------+--------+--------------+
|someField|id                      |usItemId|itemRank|customerChoice|
+---------+------------------------+--------+--------+--------------+
|someVal  |{2N3UYGUTROQK, 32667929}|32667929|1       |false         |
|someVal  |{2N3UYGUTRHQK, 32667429}|32667429|2       |true          |
|someVal  |{2N3UYGUTRYQK, 32667529}|32667529|3       |false         |
|someVal  |{2N3UYGUTIQK, 32667329} |32667329|4       |false         |
|someVal  |{2N3UYGUTYOQK, 32663929}|32663929|5       |false         |
+---------+------------------------+--------+--------+--------------+
93ze6v8z

93ze6v8z2#

substitution-列是字符串还是Spark识别为JSON?
如果它被识别为JSON,您可以简单地删除from_json

df.select("*", f.explode("substitutions"))

否则,您必须提供JSON的整个模式。这对我来说是你上面发布的JSON:

schema = ArrayType(StructType([
    StructField('customerChoice', BooleanType()),
    StructField('id', StructType([StructField('productId', StringType()),
                                  StructField('usItemId', StringType())])),
    StructField('itemRank', LongType()),
    StructField('usItemId', StringType())
]))
df.select("*", f.explode(f.from_json('substitutions', schema)))

相关问题