从pyspark框架中转换json文档

qv7cva1a  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(287)

我开始学习PySpark,但现在我坚持从一个嵌套框架中转换JSON文档,与我的示例数据相比,初始嵌套框架有超过2行。
我的初始框架在这里:

  1. df = spark.createDataFrame(["A", "B"], StringType()).toDF("id")
  2. display(df)

字符串
我在嵌套框架中调用的函数看起来像这样:

  1. def getObjectInformation(id):
  2. normalized_data = dict()
  3. theJSONresponse = requests.get("https://someone.somewhere/" + id).json()['value']
  4. theJSONresponse_dumps = json.dumps(theJSONresponse)
  5. normalized_data["_data"] = theJSONresponse_dumps
  6. return normalized_data["_data"]
  7. udf_getObjectInformation = udf(lambda x: getObjectInformation(x))


我从框架内部调用函数:

  1. df_oid = df.select('id').withColumn('oid', udf_getObjectInformation(df.id))


这些是id=A和id=B的JSON文档

  1. #normalized_data["_data"] for id = A
  2. [{"oid": "1", "id": "A", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},
  3. {"oid": "2", "id": "A", "type": "this", "oDetails": "{\"c\":\"red\",\"p\":\"book\"}"},
  4. {"oid": "3", "id": "A", "type": "that", "oDetails": "{\"c\":\"green\",\"p\":\"book\"}"}]
  5. #normalized_data["_data"] for id=B
  6. [{"oid": "57", "id": "B", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},
  7. {"oid": "59", "id": "B", "type": "that", "oDetails": "{\"c\":\"blue\",\"p\":\"shirt\"}"}]


现在,我的斗争开始了...
我希望我的最后一个框架是这样的:

  1. data = [
  2. ("A","1","this","blue","fruit"),
  3. ("A","2","this","red","book"),
  4. ("A","3","this","green" ,"book"),
  5. ("B","57","this","blue","fruit"),
  6. ("B","59","something","blue", "shirt")
  7. ]
  8. schema = StructType([ \
  9. StructField("id",StringType(),True), \
  10. StructField("oid",StringType(),True), \
  11. StructField("type",StringType(),True), \
  12. StructField("c", StringType(), True), \
  13. StructField("p", StringType(), True) \
  14. ])
  15. df_final = spark.createDataFrame(data=data,schema=schema)


任何提示,指导,解决方案都非常感谢。

kr98yfug

kr98yfug1#

您的输入是一个包含另一个JSON的JSON字符串。您可以解析JSON并应用必要的转换,例如pyspark>=3.4

  1. from pyspark.sql import functions as F
  2. from pyspark.sql import types as T
  3. raw_json = '[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"blue\\",\\"p\\":\\"fruit\\"}"}, {"oid": "2", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"red\\",\\"p\\":\\"book\\"}"}, {"oid": "3", "id": "A", "type": "that", "oDetails": "{\\"c\\":\\"green\\",\\"p\\":\\"book\\"}"}]'
  4. df = spark.createDataFrame([(raw_json, ),], ['json_col'])
  5. df.show(1)
  6. # +--------------------+
  7. # | json_col|
  8. # +--------------------+
  9. # |[{"oid": "1", "id...|
  10. # +--------------------+
  11. inner_struct_schema = T.StructType([
  12. T.StructField('id', T.StringType(), True),
  13. T.StructField('oDetails', T.StringType(), True),
  14. T.StructField('oid', T.StringType(), True),
  15. T.StructField('type', T.StringType(), True)
  16. ])
  17. json_schema = T.ArrayType(inner_struct_schema)
  18. parsed_struct = F.from_json('json_col', json_schema)
  19. df2 = df.select(F.inline(parsed_struct))
  20. df2.show(10, False)
  21. # +---+------------------------+---+----+
  22. # |id |oDetails |oid|type|
  23. # +---+------------------------+---+----+
  24. # |A |{"c":"blue","p":"fruit"}|1 |this|
  25. # |A |{"c":"red","p":"book"} |2 |this|
  26. # |A |{"c":"green","p":"book"}|3 |that|
  27. # +---+------------------------+---+----+
  28. odetails_schema = T.StructType([
  29. T.StructField('c', T.StringType(), True),
  30. T.StructField('p', T.StringType(), True),
  31. ])
  32. parsed_detail = F.from_json('oDetails', odetails_schema)
  33. df3 = df2.select(
  34. F.col('id'),
  35. F.col('oid'),
  36. F.col('type'),
  37. parsed_detail.getField('c').alias('c'),
  38. parsed_detail.getField('p').alias('p'),
  39. )
  40. df3.show(10, False)
  41. # +---+---+----+-----+-----+
  42. # |id |oid|type|c |p |
  43. # +---+---+----+-----+-----+
  44. # |A |1 |this|blue |fruit|
  45. # |A |2 |this|red |book |
  46. # |A |3 |that|green|book |
  47. # +---+---+----+-----+-----+

字符串

展开查看全部

相关问题