python优先连接pysparkDataframe

w1e3prcc  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(377)

假设我有两个pysparkDataframe。

df1
| A     | B              |
| ----- | -------------- |
| foo   | B1             |
| bar   | B2             |
| baz   | B3             |
| lol   | B9            |

df2
| X      | Y  | Z       |
| ------ | -- | --------|
| bar    | B1 | Cool    |
| foo    | B2 | Awesome |
| val    | B3 | Superb  |
| bar    | B4 | Nice    |

如何将这些Dataframe连接到 df3 所以我
优先加入 df1["A"]df2["X"] 并从 df2["Z"] ,和
如果 df3["Z"] 价值观是 null ,填写 null 值的值,这些值将是连接的结果 df1["B"]df2["Y"] 以及从 df2["Z"] ?
我想以 df4 而不是 df3 (注意 null df3中的值):

df3
| A   | B  | Z       |
| --- | -- | ------- |
| foo | B1 | Awesome |
| bar | B2 | Cool    |
| bar | B4 | Nice    |
| baz | B3 | null    |
| lol | B9 | null    |

df4
| A   | B  | Z       |
| --- | -- | ------- |
| foo | B1 | Awesome |
| bar | B2 | Cool    |
| bar | B4 | Nice    |
| baz | B3 | Superb  |
| lol | B9 | null    |

我的非简化现实世界的例子有很多重复,很多列等等,所以我看不出一个简单的when/others语句是否足够(或者我完全迷路了……)。有什么建议吗?

ve7v8dk2

ve7v8dk21#

可以尝试执行两个连接:

import pyspark.sql.functions as F

df4 = df1.join(
    df2,
    df1['A'] == df2['X'],
    'left'
).select(
    'A', 'B', 'Z'
).alias('df3').join(
    df2.alias('df2'),
    F.expr('df3.B = df2.Y and df3.Z is null'),
    'left'
).select(
    'A', 'B', F.coalesce('df3.z', 'df2.z').alias('z')
)

df4.show()
+---+---+-------+
|  A|  B|      z|
+---+---+-------+
|foo| B1|Awesome|
|bar| B2|   Nice|
|bar| B2|   Cool|
|baz| B3| Superb|
|lol| B9|   null|
+---+---+-------+

或者如果你只想加入一个,

df4 = df1.join(
    df2,
    (df1['A'] == df2['X']) | (df1['B'] == df2['Y']), 
    'left'
).selectExpr(
    '*',
    'max(A = X) over(partition by A, B) as flag'
).filter(
    '(flag and A = X) or not flag or flag is null'
).select(
    'A','B','Z'
)

df4.show()
+---+---+-------+
|  A|  B|      Z|
+---+---+-------+
|bar| B2|   Cool|
|bar| B2|   Nice|
|foo| B1|Awesome|
|lol| B9|   null|
|baz| B3| Superb|
+---+---+-------+

相关问题