如何根据条件从PySpark DataFrame中删除重复记录?

bihw5rsg  于 2023-01-13  发布在  Apache
关注(0)|答案(4)|浏览(159)

假设我有一个如下所示的PySpark DataFrame:

# Prepare Data
data = [('Italy', 'ITA'), \
    ('China', 'CHN'), \
    ('China', None), \
    ('France', 'FRA'), \
    ('Spain', None), \
    ('Taiwan', 'TWN'), \
    ('Taiwan', None)
  ]

# Create DataFrame
columns = ['Name', 'Code']
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

如您所见,有几个国家/地区重复了两次(上例中为中国和台湾)。我希望删除满足以下条件的记录:
1.列"Name"重复多次
以及
1."代码"列为Null。
请注意,对于不重复的国家(如西班牙),"代码"列可以为NULL。我希望保留这些记录。
预期输出如下所示:
| 姓名|代码|
| - ------|- ------|
| "意大利"|"意大利语"|
| "中国"|'中国'|
| "法国"|"法国"|
| "西班牙"|无效|
| 台湾|"第三世界"|
事实上,我希望每个国家都有一个记录。你知道怎么做吗?

xmd2e60i

xmd2e60i1#

您可以使用window.PartitionBy来实现所需的结果:

from pyspark.sql import Window
import pyspark.sql.functions as f

df1 = df.select('Name', f.max('Code').over(Window.partitionBy('Name')).alias('Code')).distinct()
df1.show()

输出:

+------+----+
|  Name|Code|
+------+----+
| China| CHN|
| Spain|null|
|France| FRA|
|Taiwan| TWN|
| Italy| ITA|
+------+----+
i2byvkas

i2byvkas2#

为了首先获得非空的行,使用row_number窗口函数按Name列分组,并对Code列排序,由于在Spark order by中null被认为是最小的,因此使用desc模式,然后取每组的第一行。

df = df.withColumn('rn', F.expr('row_number() over (partition by Name order by Code desc)')).filter('rn = 1').drop('rn')
zlwx9yxi

zlwx9yxi3#

这里有一种方法:

from pyspark.sql.functions import col
df = df.dropDuplicates(subset=["Name"],keep='first')
rxztt3cl

rxztt3cl4#

几乎肯定会有更聪明的方法来做到这一点,但为了吸取教训,如果你:
1.仅使用“名称”创建新 Dataframe
1.删除了重复项
1.从初始表中删除代码=“null”的记录
1.为“Code”在新表和旧表之间执行左联接
我添加了没有国家代码的澳大利亚,这样您就可以看到它也适用于这种情况

import pandas as pd

data = [('Italy', 'ITA'), \
    ('China', 'CHN'), \
    ('China', None), \
    ('France', 'FRA'), \
    ('Spain', None), \
    ('Taiwan', 'TWN'), \
    ('Taiwan', None), \
    ('Australia', None)
  ]

# Create DataFrame
columns = ['Name', 'Code']
df = pd.DataFrame(data = data, columns = columns)
print(df)

# get unique country names
uq_countries = df['Name'].drop_duplicates().to_frame()
print(uq_countries)

# remove None
non_na_codes = df.dropna()
print(non_na_codes)

# combine
final = pd.merge(left=uq_countries, right=non_na_codes, on='Name', how='left')
print(final)

相关问题