pyspark 对日期差为1天的连续行进行分组

w3nuxt5m  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(201)

我已经设法获得了包含以下列的 Dataframe :

+----------+----------+--------+
|      date| next_date|datediff|
+----------+----------+--------+
|2020-09-25|2020-09-30|       5|
|2020-09-30|2020-10-01|       1|
|2020-10-01|2020-10-02|       1|
|2020-10-02|2020-10-03|       1|
|2020-10-03|2020-10-04|       1|
|2020-10-09|2020-11-23|      45|
|2020-11-23|2020-11-24|       1|
|2020-11-24|2020-11-25|       1|
|2020-11-25|2020-11-26|       1|
+----------+----------+--------+

我通过执行以下命令得到了“group”列:

w1 = Window.orderBy("date")
df_dates.withColumn(
        "dateChange",
        (F.col("datediff") != F.lit(1)).cast("int")
    )\
    .fillna(
        0,
        subset=["dateChange"]
    )\
    .withColumn(
        "indicator",
        (~((F.col("dateChange")==0))).cast("int")
    )\
    .withColumn(
        "group",
        F.sum(F.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0))
    )

最后得到这些分组:

+----------+----------+--------+----------+---------+-----+
|      date| next_date|datediff|dateChange|indicator|group|
+----------+----------+--------+----------+---------+-----+
|2020-09-25|2020-09-30|       5|         1|        1|    1|
|2020-09-30|2020-10-01|       1|         0|        0|    1|
|2020-10-01|2020-10-02|       1|         0|        0|    1|
|2020-10-02|2020-10-03|       1|         0|        0|    1|
|2020-10-03|2020-10-04|       1|         0|        0|    1|
|2020-10-09|2020-11-23|      45|         1|        1|    2|
|2020-11-23|2020-11-24|       1|         0|        0|    2|
|2020-11-24|2020-11-25|       1|         0|        0|    2|
|2020-11-25|2020-11-26|       1|         0|        0|    2|
+----------+----------+--------+----------+---------+-----+

但是,第一行应该有自己的组。第二行应该是组2(都以1递增)。
然后进行聚合:
第一个
但我错过了第一组,也就是2020年9月25日。
这样做的目的是获取连续日期的范围,以帮助我将具有连续日期的HDFS文件夹合并到同一个分区中。

6l7fqoea

6l7fqoea1#

将示例数据重写为python脚本:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('2020-09-25', '2020-09-30',  5),
     ('2020-09-30', '2020-10-01',  1),
     ('2020-10-01', '2020-10-02',  1),
     ('2020-10-02', '2020-10-03',  1),
     ('2020-10-03', '2020-10-04',  1),
     ('2020-10-09', '2020-11-23', 45),
     ('2020-11-23', '2020-11-24',  1),
     ('2020-11-24', '2020-11-25',  1),
     ('2020-11-25', '2020-11-26',  1)],
    ["date", "next_date", "datediff"])

以下代码使用窗口函数lagsum创建组:

w = W.orderBy("date")

# _flg is the rule when subgroup inside partition must be created

df = df.withColumn("_flg", F.coalesce(F.when(F.col("datediff") != F.lag("datediff").over(w), 1), F.lit(0)))
df = df.withColumn("_grp", F.sum("_flg").over(w))

df.show()

# +----------+----------+--------+----+----+

# |      date| next_date|datediff|_flg|_grp|

# +----------+----------+--------+----+----+

# |2020-09-25|2020-09-30|       5|   0|   0|

# |2020-09-30|2020-10-01|       1|   1|   1|

# |2020-10-01|2020-10-02|       1|   0|   1|

# |2020-10-02|2020-10-03|       1|   0|   1|

# |2020-10-03|2020-10-04|       1|   0|   1|

# |2020-10-09|2020-11-23|      45|   1|   2|

# |2020-11-23|2020-11-24|       1|   1|   3|

# |2020-11-24|2020-11-25|       1|   0|   3|

# |2020-11-25|2020-11-26|       1|   0|   3|

# +----------+----------+--------+----+----+

最后,使用创建的“_grp”列和其他列(如果适用)进行分组:

df = (df
    .groupBy("_grp")
    .agg(
        F.min("date").alias("start_time"),
        F.max("next_date").alias("end_time")
    ).drop("_grp")
)
df.show()

# +----------+----------+

# |start_time|  end_time|

# +----------+----------+

# |2020-09-25|2020-09-30|

# |2020-09-30|2020-10-04|

# |2020-10-09|2020-11-23|

# |2020-11-23|2020-11-26|

# +----------+----------+

相关问题