自定义聚合到pyspark中的json

lh80um4z  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(316)

这是我当前的表:
用户idproductamount 1amount 2amount 31a1002003001b2003004002a500600700
这就是我想要的结果:
用户idamount 1amount 2amount 31{“a”:100,“b”:200}{“a”:200,“b”:300}{“a”:300,“b”:400}2{“a”:500}{“a”:600}{“a”:700}
我知道我应该使用一个用户定义的聚合函数,但无法理解如何通过pyspark实现它们。
任何帮助都将不胜感激。

dffbzjpn

dffbzjpn1#

你可以用 to_json 按分组后 User Id 创建Map列:

from pyspark.sql import functions as F

df1 = df.groupBy("User Id").agg(*[
    F.to_json(
        F.map_from_entries(F.collect_list(F.struct(F.col("Product"), F.col(c))))
    ).alias(c)
    for c in df.columns[2:]
])

df1.show()

# +-------+-----------------+-----------------+-----------------+

# |User Id|         Amount 1|         Amount 2|         Amount 3|

# +-------+-----------------+-----------------+-----------------+

# |      1|{"A":100,"B":200}|{"A":200,"B":300}|{"A":300,"B":400}|

# |      2|        {"A":500}|        {"A":600}|        {"A":700}|

# +-------+-----------------+-----------------+-----------------+

相关问题