pyspark每个日期出现的次数

ncgqoxb0  于 2021-05-16  发布在  Spark
关注(0)|答案(3)|浏览(680)

我有一个Pypark Dataframe,看起来像:

+------+-------------------+
|port  |  timestamp        |
+------+-------------------+
|9200  |2020-06-19 02:12:41|
|9200  |2020-06-19 03:54:23|
|51    |2020-06-19 05:32:11|
|22    |2020-06-20 06:07:43|
|22    |2020-06-20 01:11:12|
|51    |2020-06-20 07:38:49|
+------+-------------------+

我试图找出每天使用不同端口的次数
例如,生成的Dataframe应如下所示:

+------------+----------------+
|window      |  ports         |
+------------+----------------+
|2020-06-19  |{9200: 2, 51: 1}|
|2020-06-20  |{22: 2, 51:1 }  |
+------------+----------------+

它绝对不需要存储在字典中,我只是不确定它应该如何捕获每天的所有端口。
我目前尝试了以下方法:

df.groupBy(window(df['timestamp'], "1 day")).agg(count('port'))

结果是:

+------------+----------------+
|window      |  count(port)   |
+------------+----------------+
|2020-06-19  |3               |
|2020-06-20  |3               |
+------------+----------------+

这不是我要找的,因为它只计算每天的端口数,不按不同的端口划分。

mu0hgdu0

mu0hgdu01#

spark sql解决方案:

val df = spark.sql("""
with t1 (
select 9200 x,  '2020-06-19 02:12:41' y union all 
select 9200 x, '2020-06-19 03:54:23' y union all 
select 51 x, '2020-06-19 05:32:11' y union all 
select 22 x, '2020-06-20 06:07:43' y union all 
select 22 x, '2020-06-20 01:11:12' y union all 
select 51 x, '2020-06-20 07:38:49' y
) select x as port, y as timestamp  from t1
""")
df.show(false)

+----+-------------------+
|port|timestamp          |
+----+-------------------+
|9200|2020-06-19 02:12:41|
|9200|2020-06-19 03:54:23|
|51  |2020-06-19 05:32:11|
|22  |2020-06-20 06:07:43|
|22  |2020-06-20 01:11:12|
|51  |2020-06-20 07:38:49|
+----+-------------------+

df.createOrReplaceTempView("logtable")

spark.sql(""" 
select window, collect_set(struct(port,t)) ports from 
( select cast(timestamp as date) window, port, count(port) over(partition by cast(timestamp as date), port ) t from  logtable ) temp 
group by 1
""").show(false)

+----------+--------------------+
|window    |ports               |
+----------+--------------------+
|2020-06-20|[[22, 2], [51, 1]]  |
|2020-06-19|[[9200, 2], [51, 1]]|
+----------+--------------------+
w9apscun

w9apscun2#

如果您的目的是只获取一个不同端口每天使用的次数,那么它只不过是在一组“date column”和“port”上聚合记录的计数。

import pyspark.sql.functions as F
df.groupBy(F.to_date('timestamp').alias('date'),'port').count().orderBy('date','port').show()
+----------+----+-----+
|      date|port|count|
+----------+----+-----+
|2020-06-19|  51|    1|
|2020-06-19|9200|    2|
|2020-06-20|  22|    2|
|2020-06-20|  51|    1|
+----------+----+-----+
rta7y2nd

rta7y2nd3#

按窗口和端口分组,聚合端口数,然后按窗口分组并将端口数收集到阵列中。

df.groupBy(
    F.window(df['timestamp'], "1 day"), 'port'
).agg(
    F.array(
        F.col('port'),
        F.count('port')
    ).alias('ports')
).groupBy(
    'window'
).agg(
    F.collect_list('ports').alias('ports')
).withColumn(
    'window',
    F.col('window')['start'].cast('date')
)

+----------+--------------------+
|    window|               ports|
+----------+--------------------+
|2020-06-19|[[9200, 2], [51, 1]]|
|2020-06-20|  [[51, 1], [22, 2]]|
+----------+--------------------+

相关问题