如何将JSON字符串类型的数据框列展开为行和列

iecba09b  于 2022-10-07  发布在  Spark
关注(0)|答案(2)|浏览(213)

我的表中的一列包含一个数组JSON对象(JSON字符串):一个JSON表示一个时间戳。数据是以表的形式表示格式的记录,相关数据(“param_value”)是一个数组JSON。“param_value”包含每个时间戳的参数值。我想把它转换成用‘板材’、‘设备’和‘点’。我已经引用了this post。但我不能使用‘*’来选择要展开的所有架构。我不能确定架构,因为这项工作是ETL作业。它表明我需要使用structType来构建架构。

这张table看起来像:
Sheet|Equip值|param_value
-|-|
A1|e1|[{‘point’:‘1’,‘Status’:‘no’,‘log’:‘no’},{‘point’:‘2’,‘Status’:‘OK’,‘log’:‘no’},{‘point’:‘3’,‘Status’:‘OK’,‘log’:‘OK’}]
A2|e1|[{‘point’:‘1’,‘Status’:‘no’,‘log’:‘no’},{‘point’:‘2’,‘Status’:‘OK’,‘log’:‘no’},{‘point’:‘3’,‘Status’:‘OK’,‘log’:‘OK’}]
A3|E1|[{‘point’:‘1’,‘Status’:‘no’,‘log’:‘no’},{‘point’:‘2’,‘Status’:‘OK’,‘log’:‘no’},{‘point’:‘3’,‘Status’:‘OK’,‘log’:‘OK’}]

预期结果:

工作表|设备|点|状态|日志
-|
A1|E1|1|否|否
A1|e1|2|ok|no
A1|e1|3|ok|ok
A2|e1|1|否|否
A2|e1|2|OK|否
A2|e1|3|ok|ok
A3|E1|1|否|否
A3|e1|2|OK|否
A3|E1|3|ok|ok

最新情况:

按照ZygD的建议,我们可以将模式定义为DDL格式的字符串。因此,我成功地为多个列创建了模式,而不需要知道每个模式的名称。但是,我需要一个包含该架构的列表。有没有办法去extract the schema from the array json?(每个json中的模式都是一样的)

schema_list = ['point', 'status','log'] # -> need to extract from the array json
schema = 'array<struct<'
for c in schema_list:
    string_to_add = ',' + c +':string'
    schema = schema + string_to_add
schema = schema.replace(",", "", 1)+'>>'
s = "'" + schema + "'"
print(s)  # 'array<struct<point:string,status:string,log:string>>'
f = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, {s}))")

f.show()
+-----+-----+-----+------+---+
|sheet|equip|point|status|log|
+-----+-----+-----+------+---+
|   a1|   E1|    1|    no| no|
|   a1|   E1|    2|    ok| no|
|   a1|   E1|    3|    ok| ok|
|   a2|   E1|    1|    no| no|
|   a2|   E1|    2|    ok| no|
|   a2|   E1|    3|    ok| ok|
|   a3|   E1|    1|    no| no|
|   a3|   E1|    2|    ok| no|
|   a3|   E1|    3|    ok| ok|
+-----+-----+-----+------+---+
jucafojl

jucafojl1#

要从JSON字符串中提取数据,您需要使用from_json,您需要为其提供模式。如果将模式定义为DDL格式的字符串,则可以在inline中使用它,inline可以快速将结构数组提取到列中。

输入 Dataframe :

df = spark.createDataFrame(
    [('a1', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]"),
     ('a2', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]"),
     ('a3', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")],
    ['sheet', 'equip', 'param_value'])

脚本:

schema = "array<struct<point:string,status:string,log:string>>"
df = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, '{schema}'))")

df.show()

# +-----+-----+-----+------+---+

# |sheet|equip|point|status|log|

# +-----+-----+-----+------+---+

# |   a1|   E1|    1|    no| no|

# |   a1|   E1|    2|    ok| no|

# |   a1|   E1|    3|    ok| ok|

# |   a2|   E1|    1|    no| no|

# |   a2|   E1|    2|    ok| no|

# |   a2|   E1|    3|    ok| ok|

# |   a3|   E1|    1|    no| no|

# |   a3|   E1|    2|    ok| no|

# |   a3|   E1|    3|    ok| ok|

# +-----+-----+-----+------+---+

如果您不喜欢提供模式,您可以推断它,但效率较低:

schema = f"array<{spark.read.json(df.rdd.map(lambda r: r.param_value)).schema.simpleString()}>"
df = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, '{schema}'))")

df.show()

# +-----+-----+---+-----+------+

# |sheet|equip|log|point|status|

# +-----+-----+---+-----+------+

# |   a1|   E1| no|    1|    no|

# |   a1|   E1| no|    2|    ok|

# |   a1|   E1| ok|    3|    ok|

# |   a2|   E1| no|    1|    no|

# |   a2|   E1| no|    2|    ok|

# |   a2|   E1| ok|    3|    ok|

# |   a3|   E1| no|    1|    no|

# |   a3|   E1| no|    2|    ok|

# |   a3|   E1| ok|    3|    ok|

# +-----+-----+---+-----+------+
k4ymrczo

k4ymrczo2#

如果param_value为STRING,则需要将该字符串解析为JSON,然后将其分解为行,并将键展开为列:

import pyspark.sql.functions as F
from pyspark.sql.types import *

data = [('a1', 'E1',"[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")
       ,('a2', 'E1',"[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")
       ,('a3', 'E1',"[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")]

schema = ArrayType(StructType([StructField('point', StringType()),
                               StructField('status', StringType()),
                               StructField('log', StringType())]))

df = spark.createDataFrame(data, ['id', 'equip', 'param_value'])
    .withColumn('json_col', F.from_json(F.col('param_value'),schema))
    .select("id", "equip", F.explode("json_col").alias("json_col"))
    .select("id", "equip", F.col('json_col.*'))

df.show()

# +---+-----+-----+------+---+

# | id|equip|point|status|log|

# +---+-----+-----+------+---+

# | a1|   E1|    1|    no| no|

# | a1|   E1|    2|    ok| no|

# | a1|   E1|    3|    ok| ok|

# | a2|   E1|    1|    no| no|

# | a2|   E1|    2|    ok| no|

# | a2|   E1|    3|    ok| ok|

# | a3|   E1|    1|    no| no|

# | a3|   E1|    2|    ok| no|

# | a3|   E1|    3|    ok| ok|

# +---+-----+-----+------+---+

相关问题