在spark中,如何将一列聚合到一个频率Map中,其中包含该列中的唯一值及其频率

wxclj1h5  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(260)

我在rdd中有包含ip和端口列的记录。


# Expected input

Record 1
--------------
ip     |  1.1.1.1.1
port   |  80

Record 2
--------------
ip     |  1.1.1.1.1
port   |  43

Record 3
--------------
ip     |  1.1.1.1.1
port   |  43

我的目标是运行聚合并构建一个Map,其中键是唯一的端口,Map中的值是它们在记录/列中的频率


# Expected Output

Record 1
-------------
ip     |  1.1.1.1
ports  |  [80 -> 1, 43 -> 2]

我希望代码能在一个动作中工作(下面的代码不只是一个示例):

raw_df.groupBy('ip').agg(
  f.map_from_entries(f.collect_list(f.col("port"), count)).alias('ports')
)

我只是没能一步一步找到频率图。救命啊?

zkure5ic

zkure5ic1#

在聚合到Map之前,您可以再进行一次分组:

import pyspark.sql.functions as F

result = df.groupBy(
    'ip', 'port'
).agg(
    F.count('*').alias('count_port')
).groupBy(
    'ip'
).agg(
    F.map_from_entries(F.collect_list(F.struct('port', 'count_port'))).alias('ports')
)

result.show()
+---------+------------------+
|       ip|             ports|
+---------+------------------+
|1.1.1.1.1|[80 -> 1, 43 -> 2]|
+---------+------------------+

相关问题