如何在pysparkDataframe中基于秩和值过滤按字段分组的记录

yqyhoc1h  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(170)

我有一个pyspark dafaframe(spark 2.2/python 2.7),它为每个客户在一段时间内的多天收到的多条记录。下面是数据的简化版本。这些是按日期(yyyy-mm-dd)的顺序排列的,每个组收到它们时。数据保证每个cust\u id有多个示例。

CUST_ID Date_received   rank
1       2015-01-01      1
1       2021-01-12      2
1       2021-01-20      3
2       2015-01-01      1
2       2017-12-31      2
2       2021-02-15      3
3       2018-01-01      1
3       2019-07-31      2
4       2015-01-01      1
4       2021-01-01      2
4       2021-01-15      3

我想把这些数据分成两个独立的Dataframe。第一个Dataframe应该只有满足以下条件的记录-
客户id第一次收到(排名1)是在2015-01-01,下一次收到(排名2)是在2021-01-01或之后。从上面的数据示例来看,第一个Dataframe应该只有这些行。这应该发生在每一组客户id上

CUST_ID Date_received   rank
1       2015-01-01      1
1       2021-01-12      2
4       2015-01-01      1
4       2021-01-01      2

第二个Dataframe应该有休息-

CUST_ID Date_received   rank
1       2021-01-20      3
2       2015-01-01      1
2       2017-12-31      2
2       2021-02-15      3
3       2018-01-01      1
3       2019-07-31      2
4       2021-01-15      3
mkh04yzy

mkh04yzy1#

您可以计算条件并广播每个客户id的条件:

from pyspark.sql import functions as F, Window

df0 = df.withColumn(
    'flag1',
    (F.col('rank') == 1) & (F.col('Date_received') == '2015-01-01')
).withColumn(
    'flag2',
    (F.col('rank') == 2) & (F.col('Date_received') >= '2021-01-01')
).withColumn(
    'grp',
    F.max('flag1').over(Window.partitionBy('CUST_ID')) & 
    F.max('flag2').over(Window.partitionBy('CUST_ID'))
)

df0.show()
+-------+-------------+----+-----+-----+-----+
|CUST_ID|Date_received|rank|flag1|flag2|  grp|
+-------+-------------+----+-----+-----+-----+
|      3|   2018-01-01|   1|false|false|false|
|      3|   2019-07-31|   2|false|false|false|
|      1|   2015-01-01|   1| true|false| true|
|      1|   2021-01-12|   2|false| true| true|
|      1|   2021-01-20|   3|false|false| true|
|      4|   2015-01-01|   1| true|false| true|
|      4|   2021-01-01|   2|false| true| true|
|      4|   2021-01-15|   3|false|false| true|
|      2|   2015-01-01|   1| true|false|false|
|      2|   2017-12-31|   2|false|false|false|
|      2|   2021-02-15|   3|false|false|false|
+-------+-------------+----+-----+-----+-----+

然后可以使用 grp 列:

df1 = df0.filter('grp and rank <= 2').select(df.columns)
df2 = df0.filter('not (grp and rank <= 2)').select(df.columns)

df1.show()
+-------+-------------+----+
|CUST_ID|Date_received|rank|
+-------+-------------+----+
|      1|   2015-01-01|   1|
|      1|   2021-01-12|   2|
|      4|   2015-01-01|   1|
|      4|   2021-01-01|   2|
+-------+-------------+----+

df2.show()
+-------+-------------+----+
|CUST_ID|Date_received|rank|
+-------+-------------+----+
|      3|   2018-01-01|   1|
|      3|   2019-07-31|   2|
|      1|   2021-01-20|   3|
|      4|   2021-01-15|   3|
|      2|   2015-01-01|   1|
|      2|   2017-12-31|   2|
|      2|   2021-02-15|   3|
+-------+-------------+----+

相关问题