这些是我的table: destination
new_data
在oracle sql中,我可以做到:
MERGE INTO destination d
USING new_data n
ON (d.c1 = n.c1 AND d.c2 = n.c2)
WHEN MATCHED THEN
UPDATE SET d.d1 = n.d1
WHERE d.d1 IS NULL
WHEN NOT MATCHED THEN
INSERT (c1, c2, d1)
VALUES (n.c1, n.c2, n.d1);
那么 destination
表变为:
如果 c1
, c2
存在于 destination
以及 d1
为空, d1
得到更新。
如果 c1
, c2
不存在,将插入行。
在Pypark中有没有同样的方法?
这将生成Dataframe:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5),
('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)
nData = [('a', 'b', 1),
('c', 'd', 6),
('e', 'f', 7),
('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)
在pyspark中,几乎包含了sql中的所有内容。但我找不到与之相当的 MERGE
.
2条答案
按热度按时间drkbr07n1#
在sql中,
MERGE
可替换为左联接右联接<=>完全外部联接:但是,每次执行此合并时,都需要将所有数据重写到目标中,因为spark不支持更新,这可能会导致性能下降。因此,如果您真的需要这样做,我建议您看看delta-lake,它将acid事务引入spark,并支持合并语法。
afdcj2ne2#
可以使用左连接和合并列
coalesce
```import pyspark.sql.functions as F
result = new_data.alias('t1').join(
destination.alias('t2'),
['c1', 'c2'],
'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))
result.show()
+---+---+----+
| c1| c2| d1|
+---+---+----+
| e| f| 7|
| g| h|null|
| c| d| 6|
| a| b| 5|
+---+---+----+