PySpark聚合到单个json

py49o6xq  于 2023-04-19  发布在  Spark
关注(0)|答案(2)|浏览(302)

下面是DataFrame:

df_s
  create_date  city
0           1     1
1           2     2
2           1     1
3           1     4
4           2     1
5           3     2
6           4     3

我的目标是按create_datecity分组并计数。接下来是唯一的create_date json,键为city,值为our count form first calculation
我的代码看起来是这样的:第一步

df_s = df_s.groupby(df_s.create_date, df_s.city).agg({'city': 'count'})
df_s.show()
+-----------+----+-----------+
|create_date|city|count(city)|
+-----------+----+-----------+
|          1|   4|          1|
|          2|   1|          1|
|          4|   3|          1|
|          2|   2|          1|
|          3|   2|          1|
|          1|   1|          2|
+-----------+----+-----------+

第二步:

df_s.groupBy(df_s.create_date).agg(f.to_json(f.collect_list(f.create_map('city', 'count(city)')))).show()
+-----------+---------------------------------------------+
|create_date|to_json(collect_list(map(city, count(city))))|
+-----------+---------------------------------------------+
|          3|                                    [{"2":1}]|
|          1|                            [{"4":1},{"1":2}]|
|          4|                                    [{"3":1}]|
|          2|                            [{"1":1},{"2":1}]|
+-----------+---------------------------------------------+

所以我的问题是,我的最终结果是一个单json列表。我想得到一个有很多键的单json。
我的目标是:

+-----------+---------------------------------------------+
|create_date|to_json(collect_list(map(city, count(city))))|
+-----------+---------------------------------------------+
|          3|                                      {"2":1}|
|          1|                               {"4":1, "1":2}|
|          4|                                      {"3":1}|
|          2|                              {"1":1 , "2":1}|
+-----------+---------------------------------------------+
fykwrbwg

fykwrbwg1#

您还可以通过使用UDF来解决这个问题,并避免调用多个Map函数

df = df.groupby(df.create_date, df.city).agg({'city': 'count'})
zdf = df.groupBy(df.create_date).agg(F.collect_list(F.struct('city', 'count(city)')).alias('old_out'))

# Using Return Type String

@F.udf(StringType())
def test(input):
    flat_json = {}
    for i in input:
        # Using first index item as key and next one as value
        flat_json[i[0]] = i[1]
    
    return str(flat_json)

zdf.withColumn('flat_out', test(zdf.old_out)).show()
zujrkrfu

zujrkrfu2#

我找到了办法

>>> df
  create_date  city
0           1     1
1           2     2
2           1     1
3           1     4
4           2     1
5           3     2
6           4     3

>>> data = spark.createDataFrame(df)
>>> data = data.groupby(data.create_date, data.city).agg({'city': 'count'})
>>> result = data.groupBy("create_date") \
...     .agg(collect_list(struct(col("city"), col("count(city)"))).alias("cities")) \
...     .withColumn("cities_json", to_json(map_from_entries(col("cities")))) \
...     .select("create_date", "cities_json")

>>> result.show()
+-----------+-------------+
|create_date|  cities_json|
+-----------+-------------+
|          3|      {"2":1}|
|          1|{"4":1,"1":2}|
|          4|      {"3":1}|
|          2|{"1":1,"2":1}|
+-----------+-------------+

相关问题