如何将groupby和aggregate函数应用于pysparkDataframe中的特定窗口?

mgdq6dx1  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(426)

我想申请 groupBy 以及随后的 agg 函数,但仅限于特定窗口。这最好用一个例子来说明。假设我有一个名为 df :

df.show()

    +-----+----------+----------+-------+
    |   ID| Timestamp| Condition|  Value|
    +-----+----------+----------+-------+
    |   z1|         1|         0|     50|
|-------------------------------------------|
|   |   z1|         2|         0|     51|   |
|   |   z1|         3|         0|     52|   |
|   |   z1|         4|         0|     51|   |
|   |   z1|         5|         1|     51|   |
|   |   z1|         6|         0|     49|   |
|   |   z1|         7|         0|     44|   |
|   |   z1|         8|         0|     46|   |
|-------------------------------------------|
    |   z1|         9|         0|     48|
    |   z1|        10|         0|     42|
 +-----+----------+----------+-------+

特别是,我想做的是对row where列应用一种+-3行的窗口 Condition == 1 (即,在本例中,第5行)。在这个窗口中,如上面的数据框所示,我想找到列的最小值 Value 以及列的相应值 Timestamp ,从而获得:

+----------+----------+
| Min_value| Timestamp|
+----------+----------+
|        44|         7|
+----------+----------+

有人知道如何解决这个问题吗?
非常感谢
马里奥安扎群岛

yptwkmov

yptwkmov1#

您可以使用一个跨越前3行和后3行的窗口,获取最小值并过滤条件:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'min',
    F.min(
        F.struct('Value', 'Timestamp')
    ).over(Window.partitionBy('ID').orderBy('Timestamp').rowsBetween(-3,3))
).filter('Condition = 1').select('min.*')

df2.show()
+-----+---------+
|Value|Timestamp|
+-----+---------+
|   44|        7|
+-----+---------+

相关问题