spark条件连接,其中一个列值位于两个列值之间

mepcadol  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(246)

假设我们有一个凌乱的df

val df = Seq(
    ("id1", "2020-08-02 16:42:00", "2020-08-02 16:45:00", "item1", 1),
    ("id1", "2020-08-02 16:43:00", "2020-08-02 16:44:00", "item2", 0),
    ("id1", "2020-08-02 16:44:00", "2020-08-02 16:45:00", "item1", 0),
    ("id1", "2020-08-02 16:45:00", "2020-08-02 16:47:00", "item3", 0),
    ("id1", "2020-08-02 16:47:00", "2020-08-02 16:51:00", "item4", 0),
    ("id1", "2020-08-02 16:51:00", "2020-08-02 16:52:00", "item3", 0))
.toDF("id", "start_time", "end_time", "item_id", "flag")

df.show()

+---+-------------------+-------------------+-------+----+
| id|         start_time|           end_time|item_id|flag|
+---+-------------------+-------------------+-------+----+
|id1|2020-08-02 16:42:00|2020-08-02 16:45:00|  item1|   1|
|id1|2020-08-02 16:43:00|2020-08-02 16:44:00|  item2|   0|
|id1|2020-08-02 16:44:00|2020-08-02 16:45:00|  item1|   0|
|id1|2020-08-02 16:45:00|2020-08-02 16:47:00|  item3|   0|
|id1|2020-08-02 16:47:00|2020-08-02 16:51:00|  item4|   0|
|id1|2020-08-02 16:51:00|2020-08-02 16:52:00|  item3|   0|
+---+-------------------+-------------------+-------+----+

请注意,第一行 start_time = 16:42:00 以及 end_time = 16:45:00 ,接下来的两行有一个 start_time 它们在 start_time 以及 end_time 第一排的。我已经有专栏了 flag 检测何时观察到这种情况的仪器。在本例中,我希望保留第一行并删除下两行。我只是用一个样本,但这样的情况可以看到多次。
所以我想要的结果是

+---+-------------------+-------------------+-------+
| id|         start_time|           end_time|item_id|
+---+-------------------+-------------------+-------+
|id1|2020-08-02 16:42:00|2020-08-02 16:45:00|  item1|
|id1|2020-08-02 16:45:00|2020-08-02 16:47:00|  item3|
|id1|2020-08-02 16:47:00|2020-08-02 16:51:00|  item4|
|id1|2020-08-02 16:51:00|2020-08-02 16:52:00|  item3|
+---+-------------------+-------------------+-------+

我试着创建一个不同的df,只过滤其中的行 flag = 1 进行条件连接

spark.conf.set("spark.sql.crossJoin.enabled", "true")

val dfFiltered = df.filter("flag == 1")

df.join(dfFiltered, 
  (df("id") == dfFiltered("id")) && 
  (df("start_time") > dfFiltered("start_time")) && 
  (df("start_time") < dfFiltered("end_time")))
.show()

但它返回错误的结果

ergxz8rk

ergxz8rk1#

解决这个问题的另一种方法是不加入-您可以获得前几行的最大结束时间,并过滤掉start<max(结束时间)的行。

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "last_end", 
    max(
        when($"flag" === 1, $"end_time")
    ).over(Window.partitionBy("id").orderBy("start_time").rowsBetween(Window.unboundedPreceding, -1))
).filter("last_end is null or start_time >= last_end").drop("last_end")

df2.show
+---+-------------------+-------------------+-------+----+
| id|         start_time|           end_time|item_id|flag|
+---+-------------------+-------------------+-------+----+
|id1|2020-08-02 16:42:00|2020-08-02 16:45:00|  item1|   1|
|id1|2020-08-02 16:45:00|2020-08-02 16:47:00|  item3|   0|
|id1|2020-08-02 16:47:00|2020-08-02 16:51:00|  item4|   0|
|id1|2020-08-02 16:51:00|2020-08-02 16:52:00|  item3|   0|
+---+-------------------+-------------------+-------+----+
iaqfqrcu

iaqfqrcu2#

你想用 left_anti 加入:

val result = df.as("df").drop("flag")
  .join(
    dfFiltered.as("filter"),
    ($"df.id" === $"filter.id") &&
      ($"df.start_time" > $"filter.start_time") &&
      ($"df.start_time" < $"filter.end_time"),
    "left_anti"
  )

result.show
//+---+-------------------+-------------------+-------+
//| id|         start_time|           end_time|item_id|
//+---+-------------------+-------------------+-------+
//|id1|2020-08-02 16:42:00|2020-08-02 16:45:00|  item1|
//|id1|2020-08-02 16:45:00|2020-08-02 16:47:00|  item3|
//|id1|2020-08-02 16:47:00|2020-08-02 16:51:00|  item4|
//|id1|2020-08-02 16:51:00|2020-08-02 16:52:00|  item3|
//+---+-------------------+-------------------+-------+

或者在 WHERE 使用 EXISTS :

val result = spark.sql("""
  select * 
  from   df t1
  where  not exists(
          select 1 from df t2 
          where t1.id = t2.id 
          and t2.flag = 1
          and t1.start_time > t2.start_time 
          and t1.start_time < t2.end_time
        )
""")

相关问题