如何将使用内部连接的MS Access UPDATE查询转换为PySpark?

jogvjijk  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(119)

我有两个MS Access SQL查询,我想把它们转换成PySpark。查询看起来像这样(我们有两个表Employee和Department):
第一个

pvcm50d1

pvcm50d11#

测试 Dataframe :

from pyspark.sql import functions as F

df_emp = spark.createDataFrame([(1, 'a'), (2, 'bb')], ['EMPLOYEE', 'STATEPROVINCE'])
df_emp.show()

# +--------+-------------+

# |EMPLOYEE|STATEPROVINCE|

# +--------+-------------+

# |       1|            a|

# |       2|           bb|

# +--------+-------------+

df_dept = spark.createDataFrame([('bb', 'b')], ['STATE_LEVEL', 'STATE_ABBREVIATION'])
df_dept.show()

# +-----------+------------------+

# |STATE_LEVEL|STATE_ABBREVIATION|

# +-----------+------------------+

# |         bb|                 b|

# +-----------+------------------+

在Microsoft Access中执行SQL查询会执行下列动作:

在PySpark中,你可以这样得到它:

df = (df_emp.alias('a')
    .join(df_dept.alias('b'), df_emp.STATEPROVINCE == df_dept.STATE_LEVEL, 'left')
    .select(
        *[c for c in df_emp.columns if c != 'STATEPROVINCE'],
        F.coalesce('b.STATE_ABBREVIATION', 'a.STATEPROVINCE').alias('STATEPROVINCE')
    )
)
df.show()

# +--------+-------------+

# |EMPLOYEE|STATEPROVINCE|

# +--------+-------------+

# |       1|            a|

# |       2|            b|

# +--------+-------------+

首先你做一个左join然后select
select有2个部分。

  • 首先,从df_emp中选择除“STATEPROVINCE”之外的所有内容。
  • 然后,对于新的“STATEPROVINCE”,您从df_dept中选择“STATE_ABBREVIATION”,但如果它为空(即,在df_dept中不存在),则从df_emp中选择“STATEPROVINCE”。

对于第二个查询,只需更改select语句中的值:

df = (df_emp.alias('a')
    .join(df_dept.alias('b'), df_emp.STATEPROVINCE == df_dept.STATE_LEVEL, 'left')
    .select(
        *[c for c in df_emp.columns if c != 'MARKET'],
        F.coalesce('b.MARKET', 'a.MARKET').alias('MARKET')
    )
)

相关问题