如果两个条件中至少有一个满足,我想加入两个pysparkDataframe。
玩具数据:
df1 = spark.createDataFrame([
(10, 1, 666),
(20, 2, 777),
(30, 1, 888),
(40, 3, 999),
(50, 1, 111),
(60, 2, 222),
(10, 4, 333),
(50, None, 444),
(10, 0, 555),
(50, 0, 666)
],
['var1', 'var2', 'other_var']
)
df2 = spark.createDataFrame([
(10, 1),
(20, 2),
(30, None),
(30, 0)
],
['var1_', 'var2_']
)
我想保持所有这些行的 df1
哪里 var1
存在于 df2.var1_
或 var2
存在于 df2.var2_
(但如果该值为0,则不适用)。
所以,预期的产出是
+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
| 10| 1| 666| 10| 1| # join on both var1 and var2
| 20| 2| 777| 20| 2| # join on both var1 and var2
| 30| 1| 888| 10| 1| # join on both var1 and var2
| 50| 1| 111| 10| 1| # join on var2
| 60| 2| 222| 20| 2| # join on var2
| 10| 4| 333| 10| 1| # join on var1
| 10| 0| 555| 10| 1| # join on var1
+----+----+---------+-----+-----+
在其他的尝试中,我尝试了
cond = [(df1.var1 == (df2.select('var1_').distinct()).var1_) | (df1.var2 == (df2.filter(F.col('var2_') != 0).select('var2_').distinct()).var2_)]
df1\
.join(df2, how='inner', on=cond)\
.show()
+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
| 10| 1| 666| 10| 1|
| 20| 2| 777| 20| 2|
| 30| 1| 888| 10| 1|
| 50| 1| 111| 10| 1|
| 30| 1| 888| 30| null|
| 30| 1| 888| 30| 0|
| 60| 2| 222| 20| 2|
| 10| 4| 333| 10| 1|
| 10| 0| 555| 10| 1|
| 10| 0| 555| 30| 0|
| 50| 0| 666| 30| 0|
+----+----+---------+-----+-----+
但是我得到了比预期更多的行 var2 == 0
它们也被保存了下来。
我做错什么了?
注意:我没有使用 .isin
因为我的实际 df2
有大约20k行,我在这里读到,这个方法有大量的id,性能可能会很差。
1条答案
按热度按时间ezykj2lf1#
尝试以下条件:
条件应仅包括要联接的两个Dataframe中的列。如果你想移除
var2_ = 0
,可以将它们作为联接条件,而不是筛选器。也不需要具体说明
distinct
,因为它不影响相等条件,而且还添加了不必要的步骤。