dataframe—在sparkscala中,按分组后,在df中跨用户计数元素

lyr7nygr  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(412)

我有这个数据框:

  1. |User |country|
  2. | Ron | italy|
  3. | Tom | japan|
  4. | Lin | spain|
  5. | Tom | china|
  6. | Tom | china|
  7. | Lin | japan|
  8. | Tom | china|
  9. | Lin | japan|

我想计算每个用户的国家总数。例如,对于上面的df,我将得到:

  1. [Ron -> [italy ->1], Tom -> [Japan -> 1, china -> 3], Lin -> [Spain -> 1, Japan ->2]]

我从

  1. val groupedbyDf = df.groupBy("User")

但我不知道如何继续。。agg()?

vshtjzan

vshtjzan1#

您需要在分组后使用相关Map功能创建Map:

  1. val df2 = df.groupBy("User", "country")
  2. .count()
  3. .groupBy("User")
  4. .agg(map(
  5. col("User"),
  6. map_from_entries(collect_list(struct(col("country"), col("count"))))
  7. ).as("result")
  8. )
  9. .select("result")
  10. df2.show(false)
  11. +---------------------------------+
  12. |result |
  13. +---------------------------------+
  14. |[Tom -> [china -> 3, japan -> 1]]|
  15. |[Lin -> [spain -> 1, japan -> 2]]|
  16. |[Ron -> [italy -> 1]] |
  17. +---------------------------------+

如果要将它们全部放在一行中,可以再进行一次聚合:

  1. val df2 = df.groupBy("User", "country")
  2. .count()
  3. .groupBy("user")
  4. .agg(map_from_entries(collect_list(struct(col("country"), col("count")))).as("result"))
  5. .agg(map_from_entries(collect_list(struct(col("user"), col("result")))).as("result_all"))
  6. df2.show(false)
  7. +---------------------------------------------------------------------------------------+
  8. |result_all |
  9. +---------------------------------------------------------------------------------------+
  10. |[Tom -> [china -> 3, japan -> 1], Lin -> [spain -> 1, japan -> 2], Ron -> [italy -> 1]]|
  11. +---------------------------------------------------------------------------------------+
展开查看全部

相关问题