从pyspark框架中转换json文档

qv7cva1a  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(219)

我开始学习PySpark,但现在我坚持从一个嵌套框架中转换JSON文档,与我的示例数据相比,初始嵌套框架有超过2行。
我的初始框架在这里:

df = spark.createDataFrame(["A", "B"], StringType()).toDF("id")
display(df)

字符串
我在嵌套框架中调用的函数看起来像这样:

def getObjectInformation(id):
    normalized_data = dict()
    theJSONresponse = requests.get("https://someone.somewhere/" + id).json()['value']
    theJSONresponse_dumps = json.dumps(theJSONresponse)
    normalized_data["_data"] = theJSONresponse_dumps
    return normalized_data["_data"]

udf_getObjectInformation = udf(lambda x: getObjectInformation(x))


我从框架内部调用函数:

df_oid = df.select('id').withColumn('oid', udf_getObjectInformation(df.id))


这些是id=A和id=B的JSON文档

#normalized_data["_data"] for id = A
[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"}, 
{"oid": "2", "id": "A", "type": "this", "oDetails": "{\"c\":\"red\",\"p\":\"book\"}"}, 
{"oid": "3", "id": "A", "type": "that", "oDetails": "{\"c\":\"green\",\"p\":\"book\"}"}]

#normalized_data["_data"] for id=B
[{"oid": "57", "id": "B", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},  
{"oid": "59", "id": "B", "type": "that", "oDetails": "{\"c\":\"blue\",\"p\":\"shirt\"}"}]


现在,我的斗争开始了...
我希望我的最后一个框架是这样的:

data = [
    ("A","1","this","blue","fruit"),
    ("A","2","this","red","book"),
    ("A","3","this","green" ,"book"),
    ("B","57","this","blue","fruit"),
    ("B","59","something","blue", "shirt")
  ]

schema = StructType([ \
    StructField("id",StringType(),True), \
    StructField("oid",StringType(),True), \
    StructField("type",StringType(),True), \
    StructField("c", StringType(), True), \
    StructField("p", StringType(), True) \
  ])
 
df_final = spark.createDataFrame(data=data,schema=schema)


任何提示,指导,解决方案都非常感谢。

kr98yfug

kr98yfug1#

您的输入是一个包含另一个JSON的JSON字符串。您可以解析JSON并应用必要的转换,例如pyspark>=3.4

from pyspark.sql import functions as F
from pyspark.sql import types as T

raw_json = '[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"blue\\",\\"p\\":\\"fruit\\"}"}, {"oid": "2", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"red\\",\\"p\\":\\"book\\"}"}, {"oid": "3", "id": "A", "type": "that", "oDetails": "{\\"c\\":\\"green\\",\\"p\\":\\"book\\"}"}]'

df = spark.createDataFrame([(raw_json, ),], ['json_col'])
df.show(1)

# +--------------------+
# |            json_col|
# +--------------------+
# |[{"oid": "1", "id...|
# +--------------------+

inner_struct_schema = T.StructType([
    T.StructField('id', T.StringType(), True),
    T.StructField('oDetails', T.StringType(), True),
    T.StructField('oid', T.StringType(), True),
    T.StructField('type', T.StringType(), True)
])
json_schema = T.ArrayType(inner_struct_schema)
parsed_struct = F.from_json('json_col', json_schema)
df2 = df.select(F.inline(parsed_struct))
df2.show(10, False)

# +---+------------------------+---+----+
# |id |oDetails                |oid|type|
# +---+------------------------+---+----+
# |A  |{"c":"blue","p":"fruit"}|1  |this|
# |A  |{"c":"red","p":"book"}  |2  |this|
# |A  |{"c":"green","p":"book"}|3  |that|
# +---+------------------------+---+----+

odetails_schema = T.StructType([
    T.StructField('c', T.StringType(), True),
    T.StructField('p', T.StringType(), True),
])
parsed_detail = F.from_json('oDetails', odetails_schema)

df3 = df2.select(
    F.col('id'),
    F.col('oid'),
    F.col('type'),
    parsed_detail.getField('c').alias('c'),
    parsed_detail.getField('p').alias('p'),
)
df3.show(10, False)

# +---+---+----+-----+-----+
# |id |oid|type|c    |p    |
# +---+---+----+-----+-----+
# |A  |1  |this|blue |fruit|
# |A  |2  |this|red  |book |
# |A  |3  |that|green|book |
# +---+---+----+-----+-----+

字符串

相关问题