在pyspark中将json文件展平为单独的行

kknvjkwl  于 2023-05-28  发布在  Spark
关注(0)|答案(2)|浏览(170)

我已经收到了一个json文件作为API的输入,这里是json示例。

json_data = 
{
  "field1": "value1",
  "field2": "value2",
  "message_records": [
    {
      "field3": "value3",
      "field4": "value4"
    },
    {
      "field5": "value5",
      "field6": "value6"
    }
  ],
  "messages": [
    {
      "field7": "value3",
      "field8": "value4"
    },
    {
      "field9": "value5",
      "field10": "value6"
    },
    {
      "field11": "value5",
      "field12": "value6"
    }
  ]
}

如何使用Python将JSON数据扁平化到单独的行中并将数据加载到dataframe中。这里消息,具有嵌套数组的message_records需要加载到单独的记录中。将json文件转换为pyspark Dataframe
这里field1,field2是message_records和messages的常用字段,我需要将message_records数据写入一个单独的文件,将messages数据写入一个单独的文件

mm9b1k5b

mm9b1k5b1#

您可以使用下面的代码在单独的行中创建,并将数据写入message_recordsmessages的单独文件中。

record = {}
record["field1"] = json_data["field1"]
record["field2"] = json_data["field2"]
message_records_df =spark.createDataFrame([record])
messages_df = spark.createDataFrame([record])

使用field1field2创建 Dataframe 。因为这两个都是message_recordsmessages

from pyspark.sql.types import LongType
from pyspark.sql import Row

def zipindexdf(df):
    schema_new = df.schema.add("index", LongType(), False)
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
    
message_records_df_index = zipindexdf(message_records_df)
message_records_df_index.show()
messages_df_index = zipindexdf(messages_df)
messages_df_index.show()

这里我使用zipWithIndex添加index列,以便它可以在索引上连接。

接下来,创建dataframe并通过循环message_records中的每个项目合并到最终的dataframe,如下所示。

for i in json_data['message_records']:
    df = zipindexdf(spark.createDataFrame([i]))
    message_records_df_index = message_records_df_index.join(df, "index", "inner")
    message_records_df_index.show()

与下面的消息相同

for i in json_data['messages']:
    df = zipindexdf(spark.createDataFrame([i]))
    messages_df_index = messages_df_index.join(df, "index", "inner")
    messages_df_index.show()

最后,将这些数据写入csv文件。

message_records_df_index.write.option("header","true").csv('/message_records_df/')
messages_df_index.write.option("header","true").csv('/messages_df/')

spark.read.option("header","true").csv('/message_records_df/').show()
spark.read.option("header","true").csv('/messages_df/').show()

js4nwp54

js4nwp542#

你可以在this SO answer中找到答案。
你只需要改变你调用paras的方式。我以前

df_flat1 = flatten_test(multiline_df.select(multiline_df.field1,multiline_df.field2,multiline_df.message_records))
df_flat2 = flatten_test(multiline_df.select(multiline_df.field1,multiline_df.field2, multiline_df.messages))
df_flat1.printSchema()
df_flat1.show(5)

df_flat2.printSchema()
df_flat2.show(5)

得到了

root
 |-- field1: string (nullable = true)
 |-- field2: string (nullable = true)
 |-- message_records_field3: string (nullable = true)
 |-- message_records_field4: string (nullable = true)
 |-- message_records_field5: string (nullable = true)
 |-- message_records_field6: string (nullable = true)

+------+------+----------------------+----------------------+----------------------+----------------------+
|field1|field2|message_records_field3|message_records_field4|message_records_field5|message_records_field6|
+------+------+----------------------+----------------------+----------------------+----------------------+
|value1|value2|                value3|                value4|                  null|                  null|
|value1|value2|                  null|                  null|                value5|                value6|
+------+------+----------------------+----------------------+----------------------+----------------------+

root
 |-- field1: string (nullable = true)
 |-- field2: string (nullable = true)
 |-- messages_field10: string (nullable = true)
 |-- messages_field11: string (nullable = true)
 |-- messages_field12: string (nullable = true)
 |-- messages_field7: string (nullable = true)
 |-- messages_field8: string (nullable = true)
 |-- messages_field9: string (nullable = true)

+------+------+----------------+----------------+----------------+---------------+---------------+---------------+
|field1|field2|messages_field10|messages_field11|messages_field12|messages_field7|messages_field8|messages_field9|
+------+------+----------------+----------------+----------------+---------------+---------------+---------------+
|value1|value2|            null|            null|            null|         value3|         value4|           null|
|value1|value2|          value6|            null|            null|           null|           null|         value5|
|value1|value2|            null|          value5|          value6|           null|           null|           null|
+------+------+----------------+----------------+----------------+---------------+---------------+---------------+

相关问题