pyspark Spark删除重复项,但选择空列

ukqbszuj  于 2022-12-22  发布在  Spark
关注(0)|答案(3)|浏览(122)

我有一个表,看起来像这样:

+---------+-------------+--------------+-----------+--------+--------------+--------------+
| cust_num|valid_from_dt|valid_until_dt|cust_row_id| cust_id|insert_load_dt|update_load_dt|
+---------+-------------+--------------+-----------+--------+--------------+--------------+
|950379405|   2018-08-24|    2018-08-24|   06885247|06885247|    2018-08-24|    2018-08-25|
|950379405|   2018-08-25|    2018-08-28|   06885247|06885247|    2018-08-25|    2018-08-29|
|950379405|   2018-08-29|    2019-12-16|   27344328|06885247|    2018-08-29|    2019-12-17|<- pair 1
|950379405|   2018-08-29|    2019-12-16|   27344328|06885247|    2018-08-29|              |<- pair 1
|950379405|   2019-12-17|    2019-12-24|   91778710|06885247|    2019-12-17|              |<- pair 2
|950379405|   2019-12-17|    2019-12-24|   91778710|06885247|    2019-12-17|    2019-12-25|<- pair 2
|950379405|   2019-12-25|    2019-12-25|   08396180|06885247|    2019-12-25|    2019-12-26|<- pair 3 
|950379405|   2019-12-25|    2019-12-25|   08396180|06885247|    2019-12-25|              |<- pair 3

正如您所看到的,我的表中有一些重复的行,它们的不同之处仅在于update_load_dt为空或带有日期。
我希望以如下方式删除 Dataframe 中的重复项:

cable_dv_customer_fixed.dropDuplicates(['cust_num',
'valid_from_dt',
'valid_until_dt',
'cust_row_id',
'cust_id'])

但我想保留更多信息。
我的意思是我想保留行where update_load_dt <> ''
是否有可能修改dropduplicates()函数,以便我可以从重复项中选择要选择的行?或者是否有其他(更好的)方法可以做到这一点?

nle07wnf

nle07wnf1#

这就是我要做的,F.max()会做你想做的,并保留值最大的行。(如果有多个,则on date col max()保留最新的日期条目)。

from pyspark.sql.window import Window
key_cols = ['cust_num','valid_from_dt','valid_until_dt','cust_row_id','cust_id']
w = Window.partitionBy(key_cols)

df.withColumn('update_load_dt', F.max('update_load_dt').over(w)).dropDuplicates(key_cols)

我处理110亿+行,这并不慢。如果有帮助,请告诉我!

icnyk63a

icnyk63a2#

你可以使用窗口函数来实现这一点。2不过,使用大数据可能会很慢。

import pyspark.sql.function as F
from pyspark.sql.window import Window

df.withColumn("row_number", F.row_number().over(Window.partitionBy(<cols>).orderBy(F.asc_null_last("update_load_dt"))))
.filter("row_number = 1")
.drop("row_number") # optional
ih99xse1

ih99xse13#

使用@Topde的答案,如果创建一个bolean列来检查列中存在的值是否是最大值,则只需添加一个过滤器,该过滤器只会消除“update_load_dt”列为空的重复条目

from pyspark.sql.window import Window
import pyspark.sql.function as F
key_cols = ['cust_num','valid_from_dt','valid_until_dt','cust_row_id','cust_id']
w = Window.partitionBy(key_cols)

(df.withColumn('update_load_dt', F.when(F.max('update_load_dt').over(w), True).otherwise(False))
.filter('update_load_dt' == True)
.distinct()
)

相关问题