将pysparkDataframe转换为嵌套的json对象

r7xajy2e  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(473)

我有一个spark数据框如下

----------------------------------------------------------------------------
| item_id |   popular_tags   | popularity_score
____________________________________________________________________________
| id_1        Samsung         0.4
| id_1        long battery    0.8
| id_2        Apple           0.9
| id_2        UI              0.9
_____________________________________________________________________________

我想把这个Dataframe按 item_id 并输出为文件,每行为 json 对象

{id_1: {"Samsung":{"popularity_score":0.4}, "long_battery":{"popularity_score": 0.8}}}
{id_2: {"Apple": {"popularity_score": 0.9},"UI":{"popularity_score":0.9}}}

我试过用 to_json 以及 collect_list 但我得到的是一个列表,而不是一个嵌套的json对象。这是一个大的分布式Dataframe,因此转换成Pandas或将其收集到一台机器中不是一个选择。

ibps3vxo

ibps3vxo1#

您需要为json创建一些Map类型:

import pyspark.sql.functions as F

df2 = df.groupBy('item_id').agg(
    F.map_from_entries(
        F.collect_list(
            F.struct('popular_tags', F.struct('popularity_score'))
        )
    ).alias('m')
).select(
    F.to_json(
        F.create_map('item_id', 'm')
    ).alias('col')
)

df2.show(truncate=False)
+-------------------------------------------------------------------------------------+
|col                                                                                  |
+-------------------------------------------------------------------------------------+
|{"id_2":{"Apple":{"popularity_score":0.9},"UI":{"popularity_score":0.9}}}            |
|{"id_1":{"Samsung":{"popularity_score":0.4},"long battery":{"popularity_score":0.8}}}|
+-------------------------------------------------------------------------------------+

没有 map_from_entries ,您可能不得不依赖一些肮脏的黑客:

df2 = df.groupBy('item_id').agg(
    F.collect_list(
        F.create_map('popular_tags', F.struct('popularity_score'))
    ).alias('m')
).select(
    F.regexp_replace(
        F.regexp_replace(
            F.to_json(F.create_map('item_id', 'm')),
            '(\\[|\\])', 
            ''
        ),
    '\\},\\{', 
    ','
    ).alias('col')
)

df2.show(truncate=False)
+-------------------------------------------------------------------------------------+
|col                                                                                  |
+-------------------------------------------------------------------------------------+
|{"id_2":{"Apple":{"popularity_score":0.9},"UI":{"popularity_score":0.9}}}            |
|{"id_1":{"Samsung":{"popularity_score":0.4},"long battery":{"popularity_score":0.8}}}|
+-------------------------------------------------------------------------------------+

相关问题