计算pyspark中每组成对连续行之间的时间差

ih99xse1  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(134)

我想计算每个用户在每个SeqID上花费的时间。我有一个类似这样的 Dataframe 。但是,该时间被分配给每个用户的两个操作,Action_A and Action_B.每个用户在每个SeqID上花费的总时间将是所有此类对的总和
对于第一个用户,它是5 + 3 [(2019-12-10 10:00:00 - 2019-12-10 10:05:00) + (2019-12-10 10:20:00 - 2019-12-10 10:23:00)]
因此,理想情况下,第一个用户为SeqID 1花费了8 mins(而不是23 mins)。
类似地,用户2花费了1 + 5 = 6 mins
如何使用pyspark计算此值?

data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")), 
        (("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
        (("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]
df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])
df.show()

+---+-----+-------------------+--------+
| ID|SeqID|          Timestamp|  Action|
+---+-----+-------------------+--------+
|ID1|   15|2019-12-10 10:00:00|Action_A|
|ID1|   15|2019-12-10 10:05:00|Action_B|
|ID1|   15|2019-12-10 10:20:00|Action_A|
|ID1|   15|2019-12-10 10:23:00|Action_B|
|ID2|   23|2019-12-10 11:10:00|Action_A|
|ID2|   23|2019-12-10 11:11:00|Action_B|
|ID2|   23|2019-12-10 11:30:00|Action_A|
|ID2|   23|2019-12-10 11:35:00|Action_B|
+---+-----+-------------------+--------+

获得每一对的数据后,我可以对整个组求和(ID,SeqID)
预期输出(也可以是秒)

+---+-----+--------+
| ID|SeqID|Dur_Mins|
+---+-----+--------+
|ID1|   15|       8|
|ID2|   23|       6|
+---+-----+--------+
ezykj2lf

ezykj2lf1#

下面是一个使用Higher-Order Functions(Spark〉=2.4)的可能解决方案:

transform_expr = "transform(ts_array, (x,i) -> (unix_timestamp(ts_array[i+1]) - unix_timestamp(x))/60 * ((i+1)%2))"

df.groupBy("ID", "SeqID").agg(array_sort(collect_list(col("Timestamp"))).alias("ts_array")) \
    .withColumn("transformed_ts_array", expr(transform_expr)) \
    .withColumn("Dur_Mins", expr("aggregate(transformed_ts_array, 0D, (acc, x) -> acc + coalesce(x, 0D))")) \
    .drop("transformed_ts_array", "ts_array") \
    .show(truncate=False)

步骤:

1.将所有时间戳收集到每个组IDSeqID的数组中,并按升序对其进行排序
1.使用lambda函数(x, i) => Double对数组进行转换。其中x是实际元素,i是索引。对于数组中的每个时间戳,我们计算与下一个时间戳的diff。然后乘以(i+1)%2,以便仅将diff作为2/2对(第一个与第二个,第三个与第四个,......)因为总是有两个动作。
1.最后,我们聚合转换的结果数组,以求和所有元素。
输出量:

+---+-----+--------+
|ID |SeqID|Dur_Mins|
+---+-----+--------+
|ID1|15   |8.0     |
|ID2|23   |6.0     |
+---+-----+--------+
eagi6jfj

eagi6jfj2#

使用flatMapValuesrdd执行此操作的一种可能方法(可能也很复杂)
使用data变量

df = spark.createDataFrame(data, ["id", "seq_id", "ts", "action"]). \
    withColumn('ts', func.col('ts').cast('timestamp'))

# func to calculate the duration | applied on each row

def getDur(groupedrows):
    """
    """

    res = []

    for row in groupedrows:
        if row.action == 'Action_A':
            frst_ts = row.ts
            dur = 0
        elif row.action == 'Action_B':
            dur = (row.ts - frst_ts).total_seconds()

        res.append([val for val in row] + [float(dur)])

    return res

# run the rules on the base df | row by row

# grouped on ID, SeqID - sorted on timestamp

dur_rdd = df.rdd. \
    groupBy(lambda k: (k.id, k.seq_id)). \
    flatMapValues(lambda r: getDur(sorted(r, key=lambda ok: ok.ts))). \
    values()

# specify final schema

dur_schema = df.schema. \
    add('dur', 'float')

# convert to DataFrame

dur_sdf = spark.createDataFrame(dur_rdd, dur_schema)

dur_sdf.orderBy('id', 'seq_id', 'ts').show()

+---+------+-------------------+--------+-----+
| id|seq_id|                 ts|  action|  dur|
+---+------+-------------------+--------+-----+
|ID1|    15|2019-12-10 10:00:00|Action_A|  0.0|
|ID1|    15|2019-12-10 10:05:00|Action_B|300.0|
|ID1|    15|2019-12-10 10:20:00|Action_A|  0.0|
|ID1|    15|2019-12-10 10:23:00|Action_B|180.0|
|ID2|    23|2019-12-10 11:10:00|Action_A|  0.0|
|ID2|    23|2019-12-10 11:11:00|Action_B| 60.0|
|ID2|    23|2019-12-10 11:30:00|Action_A|  0.0|
|ID2|    23|2019-12-10 11:35:00|Action_B|300.0|
+---+------+-------------------+--------+-----+

# Your required data

dur_sdf.groupBy('id', 'seq_id'). \
    agg((func.sum('dur') / func.lit(60)).alias('dur_mins')). \
    show()

+---+------+--------+
| id|seq_id|dur_mins|
+---+------+--------+
|ID1|    15|     8.0|
|ID2|    23|     6.0|
+---+------+--------+

这符合您描述的数据,但请检查它是否符合您的所有案例。

hiz5n14c

hiz5n14c3#

使用窗口函数的另一种可能的解决方案

spark = SparkSession.Builder().master("local[3]").appName("TestApp").enableHiveSupport().getOrCreate()

data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
        (("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]

df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])

df.registerTempTable("tmpTbl")

df = spark.sql("select *, lead(Timestamp,1) over (partition by ID,SeqID order by Timestamp) Next_Timestamp from tmpTbl")

updated_df = df.filter("Action != 'Action_B'")

final_df = updated_df.withColumn("diff", (F.unix_timestamp('Next_Timestamp') - F.unix_timestamp('Timestamp'))/F.lit(60))

final_df.groupBy("ID","SeqID").agg(F.sum(F.col("diff")).alias("Duration")).show()

Output

相关问题