日期转换到pyspark?

bmp9r5qi  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(247)

我有类似的数据:

我要做的是将第一条记录的stopdate替换为最后一条记录的stopdate,这样我就可以汇总两个间隙列中都有1的所有记录。我知道这是一个f.when语句,但我能想到的一切都不能给我想要的结果。如何确保它只适用于具有此id的记录?
有人能帮忙吗?谢谢!
文本中的示例数据

ID  Startdate   Stopdate    gap_from_previous_in_days   gap_to_next_in_days
1   1/1/2021    1/2/2021        
1   1/3/2021    1/4/2021    1   1
1   1/5/2021    1/6/2021    1   1
1   1/7/2021    1/8/2021    1   1
1   1/9/2021    1/10/2021   1   1
1   1/11/2021   1/12/2021   1   1
1   1/13/2021   1/14/2021   1   1
1   1/15/2021   1/16/2021   1   1
1   1/17/2021   1/18/2021   1   1
1   1/19/2021   1/20/2021   1   2

我想要的结果:

ID  Startdate   Stopdate    gap_from_previous_in_days   gap_to_next_in_days
1   1/1/2021    1/20/2021

所以基本上我要创建一个表,而不是像这样:

ID  Startdate   Stopdate    gap_from_previous_in_days   gap_to_next_in_days
1   1/1/2021    1/2/2021        
1   1/3/2021    1/4/2021    1   1
1   1/5/2021    1/6/2021    1   1
1   1/7/2021    1/8/2021    1   1
1   1/9/2021    1/10/2021   1   1
1   1/11/2021   1/12/2021   1   1
1   1/13/2021   1/14/2021   1   1
1   1/15/2021   1/16/2021   1   1
1   1/17/2021   1/18/2021   1   1
1   1/19/2021   1/20/2021   1   3
1   1/23/2021   1/25/2021   3

看起来像这样

ID  Startdate   Stopdate    gap_from_previous_in_days   gap_to_next_in_days
1   1/1/2021    1/2/2021                                        3
1   1/23/2021   1/25/2021        3

希望这有助于说明我在做什么。我基本上是想把相隔只有一天的唱片组合起来。

yvfmudvl

yvfmudvl1#

其思想是基于具有 gap_from_previous_in_days != 1 ,按该分组列和id分组,获取最早的开始日期和最晚的结束日期,以及它们的相关间隙值:

from pyspark.sql import functions as F, Window

result = df.withColumn(
    'Startdate2', F.to_date('Startdate', 'M/d/yyyy')
).withColumn(
    'Stopdate2', F.to_date('Stopdate', 'M/d/yyyy')
).withColumn(
    'grp', 
    F.count(
        F.when(F.col('gap_from_previous_in_days') != 1, 1)
    ).over(
        Window.partitionBy('ID').orderBy('Startdate2')
    )
).groupBy('ID', 'grp').agg(
    F.min(F.struct('Startdate2', 'Startdate', 'gap_from_previous_in_days')).alias('start'), 
    F.max(F.struct('Stopdate2', 'Stopdate', 'gap_to_next_in_days')).alias('end')
).select(
    'ID', 
    'start.Startdate', 'end.Stopdate', 
    'start.gap_from_previous_in_days', 'end.gap_to_next_in_days'
)

result.show()
+---+---------+---------+-------------------------+-------------------+
| ID|Startdate| Stopdate|gap_from_previous_in_days|gap_to_next_in_days|
+---+---------+---------+-------------------------+-------------------+
|  1| 1/1/2021|1/20/2021|                     null|                  3|
|  1|1/23/2021|1/25/2021|                        3|               null|
+---+---------+---------+-------------------------+-------------------+
34gzjxbg

34gzjxbg2#

另一种方法是从给定的间隔构造所有日期。从这个日期列表开始,可以重新计算新的时间间隔。对于这种方法,不使用/不需要间隙列。因此,最后会重新计算,但如果不需要,可以省略。


# Create test dataframe

import pyspark.sql.functions as F
df = (spark.createDataFrame([[1, '2021-01-01', '2021-01-10'],
                           [1, '2021-01-11', '2021-01-12'],
                           [1, '2021-01-14', '2021-01-16'],
                           [1, '2021-01-17', '2021-01-20'],
                           [2, '2021-01-01', '2021-01-10'],
                           [2, '2021-01-12', '2021-01-14'],
                           [2, '2021-01-14', '2021-01-15'],
                           [2, '2021-01-19', '2021-01-20'],
                            ], schema="ID int, From string, To string")
      .selectExpr('ID',
                  'to_date(From, "yyyy-MM-dd") as StartDate',
                 'to_date(To, "yyyy-MM-dd") as StopDate')
     )

# Do actual calculation

df_result = (df
            # Get all included dates
           .selectExpr("ID", "explode(sequence(StartDate, StopDate)) as dates")
             # Get previous and next date
            .withColumn("Previous", F.expr('LAG(dates) OVER (PARTITION BY ID ORDER BY dates ASC)'))
            .withColumn("Next", F.expr('LAG(dates) OVER (PARTITION BY ID ORDER BY dates DESC)'))
            # Flag beginnings and endings of intervals
            .withColumn("Begin", F.expr("datediff(dates, previous)> 1 OR previous is NULL"))
            .withColumn("End", F.expr("datediff( next, dates)> 1 OR next is NULL"))
            # Only keep beginnings and endings
            .filter("Begin OR End")
            # Get end next to begin and only keep beginnings
            .withColumn("IntervalEnd", F.expr('LAG(dates) OVER (PARTITION BY ID ORDER BY dates DESC)'))
            .filter("Begin")
            # Rename columns + calculate gaps
            .selectExpr(
                       "ID",
                       "dates as StartDate",
                       "IntervalEnd as StopDate",
                       "datediff(dates, LAG(IntervalEnd) OVER (PARTITION BY ID ORDER BY dates ASC)) as gap_from_previous_in_days",
                       "datediff(LAG(dates) OVER (PARTITION BY ID ORDER BY dates DESC), IntervalEnd ) as gap_to_next_in_days"
                       )
           )
df_result.show()
+---+----------+----------+-------------------------+-------------------+
| ID| StartDate|  StopDate|gap_from_previous_in_days|gap_to_next_in_days|
+---+----------+----------+-------------------------+-------------------+
|  1|2021-01-01|2021-01-12|                     null|                  2|
|  1|2021-01-14|2021-01-20|                        2|               null|
|  2|2021-01-01|2021-01-10|                     null|                  2|
|  2|2021-01-12|2021-01-15|                        2|                  4|
|  2|2021-01-19|2021-01-20|                        4|               null|
+---+----------+----------+-------------------------+-------------------+

相关问题