我有一个关于列连接日志的数据框架 targetIP
, Time
. 此Dataframe中的每个记录都是一个系统的连接事件。 targetIP
表示这次的目标ip地址,time是连接时间。具有值:
时间目标1192.163.0.12192.163.0.23192.163.0.15192.163.0.16192.163.0.27192.163.0.28192.163.0.2
我想在某些条件下创建一个新列:在过去3个连接中,此时目标ip地址的连接数。因此,结果Dataframe应该是:
时间目标计数1192.163.0.102192.163.0.203192.163.0.115192.163.0.126192.163.0.217192.163.0.218192.163.0.22
例如, Time=8
,的 targetIP
是 192.163.0.2
,在过去的3个连接中连接到系统,分别是
Time=5 Time=6
以及 Time=7
. Time=6
以及 Time=7
的 targetIP
也是 192.163.0.2
. 那么伯爵 Time=8
是2。
我有个主意,加一个新的 ID
此Dataframe的列:
IDTimeTargetIP1192.163.0.122192.163.0.233192.163.0.145192.163.0.156192.163.0.267192.163.0.278192.163.0.2
使用窗口功能:
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy("targetIP").orderBy(F.col("ID").cast("int")).rangeBetween(-3,-1)
df1= df.withColumn("count", F.count("*").over(w)).orderBy("ID")
但如果我用 monotonically_increasing_id()
,id不连续。所以我想不用身份证就得到你的帮助。
谢谢您。
1条答案
按热度按时间eiee3dmh1#
你可以用
row_number
要分配id:不使用id的另一种方法是
collect_list
并过滤生成的数组: