如何将pyspark Dataframe 结构转换为多次出现的嵌套json数组

ryoqjall  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(187)

如何将下面这样的pyspark Dataframe 转换为json数组结构

  1. OrderID field fieldValue itemSeqNo
  2. 123 Date 01-01-23 1
  3. 123 Amount 10.00 1
  4. 123 description Pencil 1
  5. 123 Date 01-02-23 2
  6. 123 Amount 11.00 2
  7. 123 description Pen 2

字符串
下面的JSON数组结构

  1. {
  2. "orderDetails": {
  3. "orderID": "123"
  4. },
  5. "itemizationDetails": [
  6. {
  7. "Date": "01-01-23",
  8. "Amount": "10.00",
  9. "description": "Pencil"
  10. },
  11. {
  12. "Date": "01-02-23 ",
  13. "Amount": "11.00",
  14. "description": "Pen"
  15. }
  16. ]
  17. }


这是我目前的代码,输出并不像预期的那样。

  1. import pandas as pd
  2. test_dataframe = pd.DataFrame(
  3. {
  4. "OrderID" : ['123','123','123','123','123','123'],
  5. "field" :
  6. ["Date","Amount",'description','Date','Amount','description'],
  7. "fieldValue": ['01-01-23','10.00','Pencil','01-02-23
  8. ','11.00','Pen '],
  9. "itemSeqNo" : ['1','1','1','2','2','2']
  10. }
  11. )
  12. import json
  13. res = json.loads(test_dataframe.to_json(orient='records'))
  14. print(res)
  15. [{'OrderID': '123', 'field': 'Date', 'fieldValue': '01-01-23', 'itemSeqNo': '1'}, {'OrderID': '123', 'field': 'Amount', 'fieldValue': '10.00', 'itemSeqNo': '1'}, {'OrderID': '123', 'field': 'description', 'fieldValue': 'Pencil', 'itemSeqNo': '1'}, {'OrderID': '123', 'field': 'Date', 'fieldValue': '01-02-23 ', 'itemSeqNo': '2'}, {'OrderID': '123', 'field': 'Amount', 'fieldValue': '11.00', 'itemSeqNo': '2'}, {'OrderID': '123', 'field': 'description', 'fieldValue': 'Pen ', 'itemSeqNo': '2'}]

cuxqih21

cuxqih211#

Pyspark解决方案

轴心重塑框架

  1. df1 = df.groupby('OrderID', 'itemSeqNo').pivot('field').agg(F.first('fieldValue'))
  2. # +-------+---------+------+---------+-----------+
  3. # |OrderID|itemSeqNo|Amount| Date|description|
  4. # +-------+---------+------+---------+-----------+
  5. # | 123| 1| 10.00| 01-01-23| Pencil|
  6. # | 123| 2| 11.00|01-02-23 | Pen |
  7. # +-------+---------+------+---------+-----------+

字符串
将所需列打包到结构类型中

  1. df1 = df1.withColumn('itemizationDetails', F.struct('Amount', 'Date', 'description'))
  2. # +-------+---------+------+---------+-----------+-------------------------+
  3. # |OrderID|itemSeqNo|Amount|Date |description|itemizationDetails |
  4. # +-------+---------+------+---------+-----------+-------------------------+
  5. # |123 |1 |10.00 |01-01-23 |Pencil |{10.00, 01-01-23, Pencil}|
  6. # |123 |2 |11.00 |01-02-23 |Pen |{11.00, 01-02-23 , Pen } |
  7. # +-------+---------+------+---------+-----------+-------------------------+


按OrderID对框架进行分组并收集结构列表

  1. df1 = df1.groupby('OrderID').agg(F.collect_list('itemizationDetails').alias('itemizationDetails'))
  2. # +-------+-----------------------------------------------------+
  3. # |OrderID|itemizationDetails |
  4. # +-------+-----------------------------------------------------+
  5. # |123 |[{10.00, 01-01-23, Pencil}, {11.00, 01-02-23 , Pen }]|


将OrderID打包到结构字段中

  1. df1 = df1.withColumn('OrderDetails', F.struct('OrderID'))
  2. # +-------+--------------------+------------+
  3. # |OrderID| itemizationDetails|OrderDetails|
  4. # +-------+--------------------+------------+
  5. # | 123|[{10.00, 01-01-23...| {123}|
  6. # +-------+--------------------+------------+


将字符串导出为JSON

  1. result = df1.select('OrderDetails', 'itemizationDetails').toJSON().collect()
  2. ['{"OrderDetails":{"OrderID":"123"},"itemizationDetails":[{"Amount":"10.00","Date":"01-01-23","description":"Pencil"},{"Amount":"11.00","Date":"01-02-23 ","description":"Pen "}]}']

展开查看全部

相关问题