Spark:每组过滤器

uinbv5nw  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(335)

我有一个Dataframe

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     a|2020-01-01 12:49:00|first |
|     b|2020-01-01 12:44:00|second|
|     b|2020-01-01 12:46:00|first |
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

我想删除所有行,其中,对于每个组,标签 first 比标签更新 second 或者 third . 例如在组中 a 与…划清界限 first 以及 2020-01-01 12:49:00 应该删除,因为有一个旧的行 second 标签。
所需输出为:

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     b|2020-01-01 12:44:00|second|
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

按分区的窗函数 group 会在每个组内进行过滤,但如何实现标签上的过滤?

toiithl6

toiithl61#

您可以使用非“first”的标签获取上一次,并使用该列进行筛选:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "non_first_time", 
    last(
        when(col("label") =!= "first", col("time")), 
        true
    ).over(
        Window.partitionBy("group").orderBy("time")
    )
).filter("""
    label != 'first' or 
    (label = 'first' and (non_first_time > time or non_first_time is null))
""").drop("non_first_time")

df2.show
+-----+-------------------+------+
|group|               time| label|
+-----+-------------------+------+
|    c|2020-01-01 12:46:00| third|
|    b|2020-01-01 12:44:00|second|
|    a|2020-01-01 10:49:00| first|
|    a|2020-01-01 10:51:00|second|
+-----+-------------------+------+

相关问题