Apache spark scala如何检测日期交集和关联组

dpiehjr4  于 2022-12-18  发布在  Scala
关注(0)|答案(1)|浏览(123)

我有一个由2列开始和结束组成的 Dataframe ,日期类型如下:
| 启动|结束|
| - ------|- ------|
| 2019年7月1日10:01:19.000| 2019年7月1日10时11分00秒|
| 2019年7月1日10时10分05秒|2019年7月1日10时40分00秒|
| 2019年7月1日10时35分00秒|2019年7月1日12时30分00秒|
| 2019年7月1日15时20分00秒|2019年7月1日15时50分00秒|
| 2019年7月1日16时10分00秒|2019年7月1日16时35分00秒|
| 2019年7月1日16时30分00秒|2019年7月1日17时00分00秒|
我想添加一个名为组的新列,这样如果两个日期相交,他们应该在同一组。
因此结果应为:
| 启动|结束|群|
| - ------|- ------|- ------|
| 2019年7月1日10:01:19.000| 2019年7月1日10时11分00秒|1个|
| 2019年7月1日10时10分05秒|2019年7月1日10时40分00秒|1个|
| 2019年7月1日10时35分00秒|2019年7月1日12时30分00秒|1个|
| 2019年7月1日15时20分00秒|2019年7月1日15时50分00秒|第二章|
| 2019年7月1日16时10分00秒|2019年7月1日16时35分00秒|三个|
| 2019年7月1日16时30分00秒|2019年7月1日17时00分00秒|三个|
我无法确定两个日期是否相交,日期也是随机定位的。
任何帮助或提示

zd287kbt

zd287kbt1#

你可以使用spark中的窗口函数来实现这个功能,它会帮助你对数据进行排序,并获得前一行的值,这将起到作用,我认为最好在代码中添加注解来解释它:

// create the dataframe from your example:
import spark.implicits._
val df = Seq(
  ("2019-07-01 10:01:19.000", "2019-07-01 10:11:00.000"),
  ("2019-07-01 10:10:05.000", "2019-07-01 10:40:00.000"),
  ("2019-07-01 10:35:00.000", "2019-07-01 12:30:00.000"),
  ("2019-07-01 15:20:00.000", "2019-07-01 15:50:00.000"),
  ("2019-07-01 16:10:00.000", "2019-07-01 16:35:00.000"),
  ("2019-07-01 16:30:00.000", "2019-07-01 17:00:00.000"),
).toDF("start", "end")

// A window to sort date by start ascending then end ascending, to get the end of the previous row to check if there's an intersection
val w = Window.orderBy("start", "end")

// transform column from string type to timestamp type
df.select(to_timestamp(col("start")).as("start"), to_timestamp(col("end")).as("end"))
  // prev_end column contains the value of the end column of the previous row
  .withColumn("prev_end", lag("end", 1, null).over(w))
  // create column intersection with value 0 if there's intersection and 1 otherwhise
  .withColumn("intersection", when(col("prev_end").isNull.or(col("prev_end").geq(col("start")).and(col("prev_end").leq(col("end")))), 0).otherwise(1))
  // The key element to this solution: prefix sum over the window to make sure we have the right values of each group
  .withColumn("group", functions.sum("intersection").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
  .drop("prev_end", "intersection")
  .show(false)

+-------------------+-------------------+-----+
|start              |end                |group|
+-------------------+-------------------+-----+
|2019-07-01 10:01:19|2019-07-01 10:11:00|0    |
|2019-07-01 10:10:05|2019-07-01 10:40:00|0    |
|2019-07-01 10:35:00|2019-07-01 12:30:00|0    |
|2019-07-01 15:20:00|2019-07-01 15:50:00|1    |
|2019-07-01 16:10:00|2019-07-01 16:35:00|2    |
|2019-07-01 16:30:00|2019-07-01 17:00:00|2    |
+-------------------+-------------------+-----+

希望这个能帮上忙。

相关问题