如何在pyspark中设置计数id的日期间隔?

tpxzln5u  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(307)

我有一个Pypark数据框和列 parsed_date (数据类型:日期)和 id (数据类型:bigint)如下所示:

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-19|
|1477469| 2017-12-21|
|1478190| 2017-12-21|
|1478570| 2017-12-19|
|1481415| 2017-12-21|
|1472592| 2017-12-20|
|1474023| 2017-12-22|
|1474029| 2017-12-22|
+-------+-----------+

我有一个如下所示的函数。目的是传递日期(day)和t(天数)。在df1中,id在范围内计数(day-t,day),在df2中,id在范围内计数(day,day+t)。

from pyspark.sql import functions as F, Window

def hypo_1(df, day, t):

    df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}'")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between '{day}' and '{day}' + interval {t} days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

使用此代码,函数返回两个Dataframe:
示例:hypo_1(df,'2017-12-20',2)
df1型

+-----------+-------+------------+
|parsed_date|     id|count_before|
+-----------+-------+------------+
| 2017-12-20|1471783|           1|
+-----------+-------+------------+

df2型

+-----------+-------+-----------+
|parsed_date|     id|count_after| 
+-----------+-------+-----------+
| 2017-12-20|1472592|          1|
| 2017-12-21|1477469|          3|
| 2017-12-22|1474029|          2|
+-----------+-------+-----------+

问题:
df1的日期间隔看起来不正确。
不应计算我通过日期(2017-12-20)的id,这在df1和df2中都发生->

+-----------+-------+-----------+
 |parsed_date|     id|count_after| 
 +-----------+-------+-----------+
 | 2017-12-20|1472592|          1|

预期产量:
示例:hypo_1(df,'2017-12-20',2)
df1型:

+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2型:

+-------+-----------+------------+
|     id|parsed_date| count_after|
+-------+-----------+------------+
|1477469| 2017-12-21|           3|
|1474023| 2017-12-22|           2|
+-------+-----------+------------+

请帮忙。

y53ybaqx

y53ybaqx1#

只需稍微更改一下过滤条件(添加 - interval 1 day 或者 + interval 1 day ):

from pyspark.sql import functions as F, Window

def hypo_1(df, day, t):
    df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}' - interval 1 day")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between '{day}' + interval 1 day and '{day}' + interval {t} days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

df1, df2 = hypo_1(df, '2017-12-20', 2)
df1.show()
+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|           2|
|1472928| 2017-12-19|           3|
|1476917| 2017-12-19|           3|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2.show()
+-------+-----------+-----------+
|     id|parsed_date|count_after|
+-------+-----------+-----------+
|1481415| 2017-12-21|          3|
|1478190| 2017-12-21|          3|
|1477469| 2017-12-21|          3|
|1474023| 2017-12-22|          2|
|1474029| 2017-12-22|          2|
+-------+-----------+-----------+

如果你想得到你想要的输出,你可以删除重复的,例如。

df1 = df1.dropDuplicates(['parsed_date', 'count_after'])

相关问题