将Pandas中的合并转换为PySpark中的连接

ffvjumwh  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(140)

我有两个形状相同的 Dataframe ,每个 Dataframe 有3列,我想看看对于每对col_1, col_2,基准测试中的值是否等于第二个 Dataframe 中的值,并输出有问题的行:

input_benchmark =

col_1    col_2   col_to_check
girl     12      Primary
boy      14      Secondary
baby     1       Nursery
girl_1   10      Secondary
girl_2   10      Secondary

input_df =

col_1    col_2   col_to_check
girl     12      Primary
boy      14      Secondary
baby     1       Secondary
toddler  3       Kindergarten
girl_1   10      null
girl_2   10      null

我的代码工作:

def check_func(input_benchmark, input_df):
    df_new = input_df.merge(input_benchmark, on=['col_1', 'col_2'], suffixes=(None, '_actual')).query('col_to_check != col_to_check_actual')
    return df_new

col_1    col_2   col_to_check   col_to_check_actual
baby     1       Secondary      Nursery
girl_1   10      null           Secondary
girl_2   10      null           Secondary

在Spark中可以这样做吗?我还没有找到任何类似于我使用的query函数的东西。

wfveoks0

wfveoks01#

在spark中,您的query实际上是一个filter

from pyspark.sql.functions import col

df1 = (
    spark
    .createDataFrame([
        ('girl', 12, 'Primary'),
        ('boy', 14, 'Secondary'),
        ('baby', 1, 'Nursery')], 
        ['col_1', 'col_2', 'col_to_check']
    )
)

df2 = (
    spark
    .createDataFrame([
        ('girl', 12, 'Primary'),
        ('boy', 14, 'Secondary'),
        ('baby', 1, 'Secondary'),
        ('toddler', 3, 'Kindergarten')],
        ['col_1', 'col_2', 'col_to_check'])
    .select(
        'col_1',
        'col_2',
        col('col_to_check').alias('to_check') # alias this column so it isn't ambiguous
    )
)

df3 = (
    df1
    .join(df2, on=['col_1', 'col_2'])
    .filter('col_to_check != to_check')
)

df3.show()

+-----+-----+------------+---------+
|col_1|col_2|col_to_check| to_check|
+-----+-----+------------+---------+
| baby|    1|     Nursery|Secondary|
+-----+-----+------------+---------+

编辑

要处理NULL值,需要使用coalesce,并带有某种类型的默认返回值:

df3 = (
    df1
    .join(df2, on=['col_1', 'col_2'])
    .filter('coalesce(col_to_check, "default") != coalesce(to_check, "default")')
)

df3.show()
+------+-----+------------+---------+
| col_1|col_2|col_to_check| to_check|
+------+-----+------------+---------+
|  baby|    1|     Nursery|Secondary|
|girl_1|   10|   Secondary|     null|
|girl_2|   10|   Secondary|     null|
+------+-----+------------+---------+

相关问题