使JSON格式一致- Pyspark

plupiseo  于 2023-05-19  发布在  Spark
关注(0)|答案(1)|浏览(112)

两个不同格式的json,转换为一个一致的格式并读入dataframe。

>>> df.printSchema()
root
 |-- ReplicateRequest: struct (nullable = true)
 |    |-- MappingReplicateRequestMessage: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- MGroup: struct (nullable = true)
 |    |    |    |    |-- Object: array (nullable = true)
 |    |    |    |    |    |    |-- Code: string (nullable = true)

df1.printSchema()
root
 |-- ReplicateRequest: struct (nullable = true)
 |    |-- MappingReplicateRequestMessage: struct (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- MGroup: struct (nullable = true)
 |    |    |    |    |-- Object: array (nullable = true)
 |    |    |    |    |    |    |-- Code: string (nullable = true)

如果我想访问Object.code列值:
1.在第一个dataframe中,我必须在MappingReplicateRequestMessage上使用explode来向下钻取它。
df.select("ReplicateRequest.*").withColumn("expl",explode((col("MappingReplicateRequestMessage")))).select("expl.*").select("MGroup.Object")
1.在第二 Dataframe 中,我可以直接访问而不分解。
df1.select("ReplicateRequest.MappingReplicateRequestMessage.MGroup.*")
我如何使它一致和通用从转换到数组结构或结构数组之前解析

g6ll5ycj

g6ll5ycj1#

不能使用一个spark.read调用将两个具有不同模式的文件读入一个DataFrame
您将不得不在两个不同的DataFrame中读取它们,操作每个DataFrame以使用所需的公共模式创建新的DataFrame,然后将它们合并。

df1 = spark.read.csv/parquet/json()
df1 = df1.withColumn('new_json', <logic to convert>)

df2 = spark.read.csv/parquet/json()
df2 = df2.withColumn('new_json', <logic to convert>)

final_df = df1.union(df2)

也可以将输入读取为字符串

root
 |-- ReplicateRequest: string (nullable = true)

然后应用一个可以处理两种不同格式的UDF,提取Object.code并返回它,这样就得到了一个具有统一模式的新列。需要可复制的例子。
将示例数据添加到您的示例中,并使其成为可重现的示例,如下所示:

jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'

df = spark.createDataFrame([(jstr1,),(jstr2,),(jstr3,)], schema=['col1'])
df.show(truncate=False)

图纸:

+----------------------------------------------------------------------------------------------------------------------------------------------------+
|col1                                                                                                                                                |
+----------------------------------------------------------------------------------------------------------------------------------------------------+
|{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}      |
|{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}} |
|{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}|
+----------------------------------------------------------------------------------------------------------------------------------------------------+

相关问题