json—如何在pysparkDataframe上应用group by,并在结果对象上应用转换

ezykj2lf  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(394)

我有一个spark数据框

  1. | item_id | attribute_key| attribute_value
  2. ____________________________________________________________________________
  3. | id_1 brand Samsung
  4. | id_1 ram 6GB
  5. | id_2 brand Apple
  6. | id_2 ram 4GB
  7. _____________________________________________________________________________

我想把这个Dataframe按 item_id 并输出为文件,每行为 json 对象

  1. {id_1: "properties":[{"brand":['Samsung']},{"ram":['6GB']} ]}
  2. {id_2: "properties":[{"brand":['Apple']},{"ram":['4GB']} ]}

这是一个大的分布式数据框架,所以转换成Pandas不是一个选择。这种转变在pyspark中可能吗

23c0lvtd

23c0lvtd1#

在scala中,但python版本将非常相似(sql.functions):

  1. val df = Seq((1,"brand","Samsung"),(1,"ram","6GB"),(1,"ram","8GB"),(2,"brand","Apple"),(2,"ram","6GB")).toDF("item_id","attribute_key","attribute_value")
  2. +-------+-------------+---------------+
  3. |item_id|attribute_key|attribute_value|
  4. +-------+-------------+---------------+
  5. | 1| brand| Samsung|
  6. | 1| ram| 6GB|
  7. | 1| ram| 8GB|
  8. | 2| brand| Apple|
  9. | 2| ram| 6GB|
  10. +-------+-------------+---------------+
  11. df.groupBy('item_id,'attribute_key)
  12. .agg(collect_list('attribute_value).as("list2"))
  13. .groupBy('item_id)
  14. .agg(map(lit("properties"),collect_list(map('attribute_key,'list2))).as("prop"))
  15. .select(to_json(map('item_id,'prop)).as("json"))
  16. .show(false)

输出:

  1. +------------------------------------------------------------------+
  2. |json |
  3. +------------------------------------------------------------------+
  4. |{"1":{"properties":[{"ram":["6GB","8GB"]},{"brand":["Samsung"]}]}}|
  5. |{"2":{"properties":[{"brand":["Apple"]},{"ram":["6GB"]}]}} |
  6. +------------------------------------------------------------------+
展开查看全部

相关问题