我在pyspark工作,我需要计算两个事件之间满足条件的月数。
接下来我展示一下我的table是怎样的,这样你才能更好地理解我。这是我的初始Dataframe。
from pyspark.sql import Row, Window
from pyspark.sql.functions import *
from datetime import datetime, date
row = Row("id", "start", "condition")
df = sc.parallelize([
row(1, "2015-01-31", 0),
row(1, "2015-02-28", 0),
row(1, "2015-03-31", 0),
row(1, "2015-04-30", 0),
row(1, "2015-05-31", 1),
row(1, "2015-06-30", 1)
]).toDF().withColumn("start", col("start").cast("date"))
## +---+----------+----------+
## | id| start| condition|
## +---+----------+----------+
## | 1|2015-01-31| 0|
## | 1|2015-02-28| 0|
## | 1|2015-03-31| 0|
## | 1|2015-04-30| 0|
## | 1|2015-05-31| 1|
## | 1|2015-06-30| 1|
## +---+----------+----------+
我想要这个结果:
## +---+----------+----------+------------------+
## | id| start| condition| Months_between|
## +---+----------+----------+---------------+
## | 1|2015-01-31| 0| 4|
## | 1|2015-02-28| 0| 3|
## | 1|2015-03-31| 0| 2|
## | 1|2015-04-30| 0| 1|
## | 1|2015-05-31| 1| 0|
## | 1|2015-06-30| 1| 0|
## +---+----------+----------+---------------+
我想知道一行和另一行之间经过了多少个月,条件从0变为1。如果条件从未变为1,则应为0。样本有一个id,但每个日期有多个id。
我曾想过做一个Windows,但我不知道如何计算月数。我有这样的想法:
max_days = (df.select(max("start")).collect()[0][0] - df.select(min("start")).collect()[0][0]).days
days = lambda i: i * 86400
window = Window.partitionBy("id").orderBy(col("start").cast("long")).rangeBetween(days(max_days), 0)
谢谢!它的工作原理是:
df2 = df.withColumn(
'Months_between',
F.when(
F.col('condition') == 0,
F.months_between(
F.min(
F.when(F.col('condition') == 1, F.col('start'))
).over(Window.partitionBy('id')),
F.col('start')
).cast('int')
).otherwise(0)
)
但是当我有这个例子的时候,我发现了一个问题。当值在0和1之间多次更改时。
| id| start|condition|Months_between|
+---+----------+---------+------------------+
| 1|2015-01-31| 0| 2|
| 1|2015-02-28| 0| 1|
| 1|2015-03-31| 1| 0|
| 1|2015-04-30| 1| 0|
| 1|2015-05-31| 0| -1|
| 1|2015-06-30| 1| 0|
+---+----------+---------+-----------------+
在日期2015-05-31,它应该取值1,但当搜索最小值时,它得到值-1。有什么建议吗?谢谢!
谢谢你的帮助!
1条答案
按热度按时间pjngdqdw1#
您可以为每个id找到condition=1的最早日期,并使用
months_between
: