json—如何在pysparkDataframe上应用group by,并在结果对象上应用转换

ezykj2lf  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(374)

我有一个spark数据框

| item_id | attribute_key| attribute_value
____________________________________________________________________________
| id_1        brand          Samsung
| id_1        ram            6GB
| id_2        brand          Apple
| id_2        ram            4GB
_____________________________________________________________________________

我想把这个Dataframe按 item_id 并输出为文件,每行为 json 对象

{id_1: "properties":[{"brand":['Samsung']},{"ram":['6GB']} ]}
{id_2: "properties":[{"brand":['Apple']},{"ram":['4GB']} ]}

这是一个大的分布式数据框架,所以转换成Pandas不是一个选择。这种转变在pyspark中可能吗

23c0lvtd

23c0lvtd1#

在scala中,但python版本将非常相似(sql.functions):

val df = Seq((1,"brand","Samsung"),(1,"ram","6GB"),(1,"ram","8GB"),(2,"brand","Apple"),(2,"ram","6GB")).toDF("item_id","attribute_key","attribute_value")

+-------+-------------+---------------+
|item_id|attribute_key|attribute_value|
+-------+-------------+---------------+
|      1|        brand|        Samsung|
|      1|          ram|            6GB|
|      1|          ram|            8GB|
|      2|        brand|          Apple|
|      2|          ram|            6GB|
+-------+-------------+---------------+

df.groupBy('item_id,'attribute_key)
.agg(collect_list('attribute_value).as("list2"))
.groupBy('item_id)
.agg(map(lit("properties"),collect_list(map('attribute_key,'list2))).as("prop"))
.select(to_json(map('item_id,'prop)).as("json"))
.show(false)

输出:

+------------------------------------------------------------------+
|json                                                              |
+------------------------------------------------------------------+
|{"1":{"properties":[{"ram":["6GB","8GB"]},{"brand":["Samsung"]}]}}|
|{"2":{"properties":[{"brand":["Apple"]},{"ram":["6GB"]}]}}        |
+------------------------------------------------------------------+

相关问题