如何在一定条件下在sparkDataframe中创建新列count

laximzn5  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(311)

我有一个关于列连接日志的数据框架 Id , targetIP , Time . 此Dataframe中的每个记录都是一个系统的连接事件。id表示这个连接, targetIP 表示这次的目标ip地址,time是连接时间。具有值:
IDTimeTargetIP1192.163.0.122192.163.0.233192.163.0.145192.163.0.156192.163.0.267192.163.0.278192.163.0.2
我想在某些条件下创建一个新列:在过去的2个时间单位中,与此时间的目标ip地址的连接数。因此,结果Dataframe应该是:
idtimetargetipcount11192.163.0.1022192.163.0.2033192.163.0.1145192.163.0.1156192.163.0.2067192.163.0.2178192.163.0.22
例如, ID=7 ,的 targetIP192.163.0.2 在过去2个时间单位内连接到系统 ID=5 以及 ID=6 ,以及他们的 targetIP 也是 192.163.0.2 . 那么伯爵 ID=7 是2。
期待您的帮助。

jq6vz3qz

jq6vz3qz1#

你可以用 count 在范围介于-2和当前行之间的窗口上,以获取最近2个时间单位中的ip计数。
使用spark sql可以执行以下操作:

df.createOrReplaceTempView("connection_logs")

df1 = spark.sql("""
    SELECT  *,
            COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time 
                          RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
                          ) -1 AS count
    FROM    connection_logs
    ORDER BY ID
""")

df1.show()

# +---+----+-----------+-----+

# | ID|Time|   targetIP|count|

# +---+----+-----------+-----+

# |  1|   1|192.163.0.1|    0|

# |  2|   2|192.163.0.2|    0|

# |  3|   3|192.163.0.1|    1|

# |  4|   5|192.163.0.1|    1|

# |  5|   6|192.163.0.2|    0|

# |  6|   7|192.163.0.2|    1|

# |  7|   8|192.163.0.2|    2|

# +---+----+-----------+-----+

或使用Dataframeapi:

from pyspark.sql import Window
from pyspark.sql import functions as F

time_unit = lambda x: x

w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)

df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")

df1.show()
7rtdyuoh

7rtdyuoh2#

所以,你基本上需要的是一个窗口函数。
让我们从你的初始数据开始

import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class Event(ID: Int, Time: Int, targetIP: String)

val events = Seq(
    Event(1, 1, "192.163.0.1"),
    Event(2, 2, "192.163.0.2"),
    Event(3, 3, "192.163.0.1"),
    Event(4, 5, "192.163.0.1"),
    Event(5, 6, "192.163.0.2"),
    Event(6, 7, "192.163.0.2"),
    Event(7, 8, "192.163.0.2")
).toDS()

现在我们需要定义一个窗口函数本身

val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)

现在最有趣的部分是:如何在Windows上数东西?没有简单的方法,因此我们将执行以下操作
将所有目标IP聚合到列表中
筛选列表以仅查找所需的IP
计算列表的大小

val df = events
        .withColumn("tmp", collect_list($"targetIp").over(timeWindow))
        .withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
        .drop($"tmp")

结果将包含我们需要的新列“count”!
升级版本:
有一个更简短的版本,没有聚合,由@blackbishop编写,

val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
        .withColumn("count", count("*").over(timeWindow) - lit(1))
        .explain(true)

相关问题