Pyspark:将列中的json分解为多列

anhgbhbe  于 2023-08-02  发布在  Spark
关注(0)|答案(6)|浏览(137)

数据是这样的-

+-----------+-----------+-----------------------------+
|         id|      point|                         data|
+-----------------------------------------------------+
|        abc|          6|{"key1":"124", "key2": "345"}|
|        dfl|          7|{"key1":"777", "key2": "888"}|
|        4bd|          6|{"key1":"111", "key2": "788"}|

字符串
我试着把它转换成下面的格式。

+-----------+-----------+-----------+-----------+
|         id|      point|       key1|       key2|
+------------------------------------------------
|        abc|          6|        124|        345|
|        dfl|          7|        777|        888|
|        4bd|          6|        111|        788|


explode函数将 Dataframe 分解为多行。但这不是理想的解决方案。
注意:这个解决方案没有回答我的问题。PySpark "explode" dict in column

wj8zmpe1

wj8zmpe11#

只要您使用的是Spark 2.1或更高版本,pyspark.sql.functions.from_json就应该能得到您想要的结果,但您需要首先定义所需的schema

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('key1', StringType(), True),
        StructField('key2', StringType(), True)
    ]
)

df.withColumn("data", from_json("data", schema))\
    .select(col('id'), col('point'), col('data.*'))\
    .show()

字符串
这样你就能

+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc|    6| 124| 345|
|df1|    7| 777| 888|
|4bd|    6| 111| 788|
+---+-----+----+----+

3df52oht

3df52oht2#

正如@pault所建议的,数据字段是string字段。由于密钥是相同的(即,'key 1','key 2'),也可以使用json_tuple()(根据文档,此函数是1.6版本的新增功能)

from pyspark.sql import functions as F

df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()

字符串
下面是我的原始帖子:如果原始表来自df.show(truncate=False),那么data字段不是python数据结构,那么这很可能是错误的
因为你已经将数据分解成行,所以我假设列data是一个Python数据结构而不是字符串:

from pyspark.sql import functions as F

df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()

fruv7luv

fruv7luv3#

正如@jxc所提到的,如果您无法预先定义模式,并且您只需要处理单一级别的json字符串,那么json_tuple应该可以正常工作。我认为它更直接,更容易使用。奇怪的是,我没有发现其他人提到这个功能之前。
在我的用例中,原始数据框模式:StructType(List(StructField(a,StringType,true))),json字符串列显示为:

+---------------------------------------+
|a                                      |
+---------------------------------------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
|{"k1": "v11", "k3": "v33"}             |
|{"k1": "v13", "k2": "23"}              |
+---------------------------------------+

字符串
使用json_tuple将json字段扩展为新列:

from pyspark.sql import functions as F

df = df.select(F.col('a'), 
    F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
    .alias('k1', 'k2', 'k3'))

df.schema
df.show(truncate=False)


文档中没有详细说明,但至少在我的用例中,json_tuple提取的新列是StringType,并且它只提取单一深度的JSON字符串。

StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))

+---------------------------------------+---+----+-------+
|a                                      |k1 |k2  |k3     |
+---------------------------------------+---+----+-------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2   |{"m":1}|
|{"k1": "v11", "k3": "v33"}             |v11|null|v33    |
|{"k1": "v13", "k2": "23"}              |v13|23  |null   |
+---------------------------------------+---+----+-------+

31moq8wy

31moq8wy4#

这适用于我的用例

data1 = spark.read.parquet(path)
json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
data2 = data1.withColumn("data", from_json("json_col", json_schema))
col1 = data2.columns
col1.remove("data")
col2 = data2.select("data.*").columns
append_str ="data."
col3 = [append_str + val for val in col2]
col_list = col1 + col3
data3 = data2.select(*col_list).drop("json_col")

字符串

20jt8wwn

20jt8wwn5#

作者:Shrikant Prabhu
您可以简单地使用SQL

SELECT id, point, data.*
FROM original_table

字符串
像这样,如果数据发生变化,新表的模式将自适应,并且您不必在管道中做任何事情。

8gsdolmq

8gsdolmq6#

在这种方法中,您只需要设置包含Json内容的列的名称。无需设置模式。一切都是自动的。

json_col_name = 'data'
keys = df.select(f"{json_col_name}.*").columns
jsonFields= [f"{json_col_name}.{key} {key}" for key in keys]

main_fields = [key for key in df.columns if key != json_col_name]
df_new = df.selectExpr(main_fields + jsonFields)

字符串

相关问题