python—计算pyspark中发生条件时两个事件之间的月数

dgiusagp  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(345)

我在pyspark工作,我需要计算两个事件之间满足条件的月数。
接下来我展示一下我的table是怎样的,这样你才能更好地理解我。这是我的初始Dataframe。

from pyspark.sql import Row, Window
from pyspark.sql.functions import *
from datetime import datetime, date

row = Row("id", "start", "condition")
df = sc.parallelize([
    row(1, "2015-01-31", 0),
    row(1, "2015-02-28", 0),
    row(1, "2015-03-31", 0),
    row(1, "2015-04-30", 0),
    row(1, "2015-05-31", 1),
    row(1, "2015-06-30", 1)
]).toDF().withColumn("start", col("start").cast("date"))

## +---+----------+----------+

## | id|     start| condition|

## +---+----------+----------+

## |  1|2015-01-31|         0|

## |  1|2015-02-28|         0|

## |  1|2015-03-31|         0|

## |  1|2015-04-30|         0|

## |  1|2015-05-31|         1|

## |  1|2015-06-30|         1|

## +---+----------+----------+

我想要这个结果:


## +---+----------+----------+------------------+

## | id|     start| condition| Months_between|

## +---+----------+----------+---------------+

## |  1|2015-01-31|         0|              4|

## |  1|2015-02-28|         0|              3|

## |  1|2015-03-31|         0|              2|

## |  1|2015-04-30|         0|              1|

## |  1|2015-05-31|         1|              0|

## |  1|2015-06-30|         1|              0|

## +---+----------+----------+---------------+

我想知道一行和另一行之间经过了多少个月,条件从0变为1。如果条件从未变为1,则应为0。样本有一个id,但每个日期有多个id。
我曾想过做一个Windows,但我不知道如何计算月数。我有这样的想法:

max_days = (df.select(max("start")).collect()[0][0] - df.select(min("start")).collect()[0][0]).days
days = lambda i: i * 86400
window = Window.partitionBy("id").orderBy(col("start").cast("long")).rangeBetween(days(max_days), 0)

谢谢!它的工作原理是:

df2 = df.withColumn(
    'Months_between', 
    F.when(
        F.col('condition') == 0, 
        F.months_between(
            F.min(
                F.when(F.col('condition') == 1, F.col('start'))
            ).over(Window.partitionBy('id')), 
            F.col('start')
        ).cast('int')
    ).otherwise(0)
)

但是当我有这个例子的时候,我发现了一个问题。当值在0和1之间多次更改时。

| id|     start|condition|Months_between|
+---+----------+---------+------------------+
|  1|2015-01-31|        0|              2|
|  1|2015-02-28|        0|              1|
|  1|2015-03-31|        1|              0|
|  1|2015-04-30|        1|              0|
|  1|2015-05-31|        0|             -1|
|  1|2015-06-30|        1|              0|
+---+----------+---------+-----------------+

在日期2015-05-31,它应该取值1,但当搜索最小值时,它得到值-1。有什么建议吗?谢谢!
谢谢你的帮助!

pjngdqdw

pjngdqdw1#

您可以为每个id找到condition=1的最早日期,并使用 months_between :

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'next_start',
    F.first(
        F.when(F.col('condition') == 1, F.col('start')),
        ignorenulls=True
    ).over(
        Window.partitionBy('id')
              .orderBy('start')
              .rowsBetween(0, Window.unboundedFollowing)
    )
).withColumn(
    'Months_between', 
    F.when(
        F.col('condition') == 0, 
        F.months_between(
            F.col('next_start'), 
            F.col('start')
        ).cast('int')
    ).otherwise(0)
).drop('next_start')

df2.show() 
+---+----------+---------+--------------+
| id|     start|condition|Months_between|
+---+----------+---------+--------------+
|  1|2015-01-31|        0|             2|
|  1|2015-02-28|        0|             1|
|  1|2015-03-31|        1|             0|
|  1|2015-04-30|        1|             0|
|  1|2015-05-31|        0|             1|
|  1|2015-06-30|        1|             0|
+---+----------+---------+--------------+

相关问题