pyspark中的过滤器最大数量

rt4zxlrg  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(263)

有没有办法在pyspark中过滤并获得最大结果?我在下面尝试了这个,但是它只得到rownum小于2的行。

df2 = df.withColumn("rownum",row_number().over(Window.partitionBy("SID", "Start Date", "End Date").orderBy("SID"))).filter(col("rownum")<2).orderBy("SID").drop("rownum")

我在下面有一个数据框。

+--------+----------+----------+--------+----------+
|SID     |StartDate |EndDate   |CID     |Date Added|
+--------+----------+----------+--------+----------+
|1001    |2021-04-05|2021-04-05|1002    |2021-03-22|
|1001    |2021-03-31|2021-03-31|1002    |2021-03-22|
|1004    |2021-04-05|2021-04-05|1003    |2021-03-22|
|1005    |2021-04-06|2021-04-06|1006    |2021-03-22|
|1001    |2021-04-05|2021-04-05|1002    |2021-03-30|
|1001    |2021-03-31|2021-03-31|1006    |2021-03-30|
+--------+----------+----------+--------+----------+

这是我预期的结果。

+--------+----------+----------+--------+----------+
|SID     |StartDate |EndDate   |CID     |Date Added|
+--------+----------+----------+--------+----------+
|1004    |2021-04-05|2021-04-05|1003    |2021-03-22|
|1005    |2021-04-06|2021-04-06|1006    |2021-03-22|
|1001    |2021-04-05|2021-04-05|1002    |2021-03-30|
|1001    |2021-03-31|2021-03-31|1006    |2021-03-30|
+--------+----------+----------+--------+----------+
edqdpe6u

edqdpe6u1#

代码中的某些列名不存在,但我猜您可能希望按 CID 以及 Date Added 如果要获取最新值,请按降序排列:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    "rownum", 
    F.row_number().over(
        Window.partitionBy("SID", "StartDate", "EndDate")
              .orderBy(F.desc("CID"), F.desc("DateAdded"))
    )
).filter(F.col("rownum")<2).orderBy("DateAdded").drop("rownum")

df2.show()
+----+----------+----------+----+----------+
| SID| StartDate|   EndDate| CID| DateAdded|
+----+----------+----------+----+----------+
|1005|2021-04-06|2021-04-06|1006|2021-03-22|
|1004|2021-04-05|2021-04-05|1003|2021-03-22|
|1001|2021-04-05|2021-04-05|1002|2021-03-30|
|1001|2021-03-31|2021-03-31|1006|2021-03-30|
+----+----------+----------+----+----------+

相关问题