我有一个SQL查询,我想把它转换成PySpark。在SQL查询中,我们连接了 * 三 * 个表,并更新了一个匹配的列。SQL查询如下所示:
UPDATE [DEPARTMENT_DATA]
INNER JOIN ([COLLEGE_DATA]
INNER JOIN [STUDENT_TABLE]
ON COLLEGE_DATA.UNIQUEID = STUDENT_TABLE.PROFESSIONALID)
ON DEPARTMENT_DATA.PUBLICID = COLLEGE_DATA.COLLEGEID
SET STUDENT_TABLE.PRIVACY = "PRIVATE"
这个逻辑我已经试过了:
df_STUDENT_TABLE = (
df_STUDENT_TABLE.alias('a')
.join(
df_COLLEGE_DATA('b'),
on=F.col('a.PROFESSIONALID') == F.col('b.UNIQUEID'),
how='left',
)
.join(
df_DEPARTMENT_DATA.alias('c'),
on=F.col('b.COLLEGEID') == F.col('c.PUBLICID'),
how='left',
)
.select(
*[F.col(f'a.{c}') for c in df_STUDENT_TABLE.columns],
F.when(
F.col('b.UNIQUEID').isNotNull() & F.col('c.PUBLICID').isNotNull()
F.lit('PRIVATE')
).alias('PRIVACY')
)
)
这段代码添加了一个新的列“PRIVACY”,但是在运行后给出了空值。
2条答案
按热度按时间vxf3dgd41#
PRIVATE
)privacy
列为空。req_value
是具有所需值的列,并且这些值需要在privacy
中反映出来,因此可以直接使用下面的代码。更新日期:
您还可以使用下面的代码,其中我使用
withColumn
而不是select更新了列。z9gpfhce2#
在连接之后,您可以使用**
nvl2
**。它可以检查与最后一个 Dataframe (df_dept
)的连接是否成功,如果成功,则您可以返回“PRIVATE”,否则返回df_stud.PRIVACY
中的值。输入:
脚本: