在pyspark中动态生成作为列表的连接条件时,如何在元素之间应用“or”而不是“and”?

h9vpoimq  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(346)

我正在连接两个Dataframesite_bs和site_wrk_int1,并使用动态连接条件创建site_wrk。
我的代码如下:

join_cond=[ col(v_col) == col('wrk_'+v_col) for v_col in primaryKeyCols]  #result would be 
site_wrk=site_bs.join(site_wrk_int1,join_cond,'inner').select(*site_bs.columns)

join\u cond将是动态的,其值类似于[col(id)==col(wrk\u id),col(id)==col(wrk\u parentid)]
在上述连接条件中,连接将同时满足上述两个条件。i、 例如,连接条件将是

id = wrk_id  and id = wrk_parentId

但我希望或条件适用如下

id = wrk_id  or id = wrk_parentId

如何在Pypark中实现这一点?

jk9hmnmh

jk9hmnmh1#

因为上的逻辑操作 pyspark 列返回列对象,可以在join语句中链接这些条件,例如:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    (1, "A", "A"),
    (2, "C", "C"), 
    (3, "E", "D"), 
], ['id', 'col1', 'col2'] 
)
df.show()
+---+----+----+
| id|col1|col2|
+---+----+----+
|  1|   A|   A|
|  2|   C|   C|
|  3|   E|   D|
+---+----+----+

df.alias("t1").join(
    df.alias("t2"),
    (f.col("t1.col1") == f.col("t2.col2")) | (f.col("t1.col1") == f.lit("E")),
    "left_outer"
).show(truncate=False)
+---+----+----+---+----+----+
|id |col1|col2|id |col1|col2|
+---+----+----+---+----+----+
|1  |A   |A   |1  |A   |A   |
|2  |C   |C   |2  |C   |C   |
|3  |E   |D   |1  |A   |A   |
|3  |E   |D   |2  |C   |C   |
|3  |E   |D   |3  |E   |D   |
+---+----+----+---+----+----+

如你所见,我得到了 True ID为1和2的左侧行的值 col1 == col2 OR col1 == E 哪个是 True 对于我的Dataframe的三行。在语法方面,python操作符( | & ... )如上例所示,用闭括号分隔,否则可能会混淆 py4j 错误。
或者,如果你想保持类似的符号,你在你的问题中说,为什么不使用 functools.reduce 以及 operator.or_ 将此逻辑应用于列表,例如:
在这个例子中,我有一个 AND 我的列条件和get之间的条件 NULL 只是,如预期的那样:

df.alias("t1").join(
    df.alias("t2"),
    [f.col("t1.col1") == f.col("t2.col2"),  f.col("t1.col1") == f.lit("E")],
    "left_outer"
).show(truncate=False)
+---+----+----+----+----+----+
|id |col1|col2|id  |col1|col2|
+---+----+----+----+----+----+
|3  |E   |D   |null|null|null|
|1  |A   |A   |null|null|null|
|2  |C   |C   |null|null|null|
+---+----+----+----+----+----+

在这个例子中,我利用 functools 以及 operator 要得到与上述相同的结果:

df.alias("t1").join(
    df.alias("t2"),
    functools.reduce(
      operator.or_, 
      [f.col("t1.col1") == f.col("t2.col2"),  f.col("t1.col1") == f.lit("E")]),
    "left_outer"
).show(truncate=False)
+---+----+----+---+----+----+
|id |col1|col2|id |col1|col2|
+---+----+----+---+----+----+
|1  |A   |A   |1  |A   |A   |
|2  |C   |C   |2  |C   |C   |
|3  |E   |D   |1  |A   |A   |
|3  |E   |D   |2  |C   |C   |
|3  |E   |D   |3  |E   |D   |
+---+----+----+---+----+----+
ikfrs5lh

ikfrs5lh2#

我对sparksql很陌生。如果这能解决问题,请通知我。

site_wrk = site_bs.join(site_work_int1, [(site_bs.id == site_work_int1.wrk_id) | (site_bs.id == site_work_int1.wrk_parentId)], how = "inner")

相关问题