oracle合并重写为pyspark如果为空-更新,否则-插入

bq8i3lrv  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(303)

这些是我的table: destinationnew_data
在oracle sql中,我可以做到:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
  WHEN MATCHED THEN
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
  WHEN NOT MATCHED THEN
    INSERT (c1, c2, d1)
    VALUES (n.c1, n.c2, n.d1);

那么 destination 表变为:

如果 c1 , c2 存在于 destination 以及 d1 为空, d1 得到更新。
如果 c1 , c2 不存在,将插入行。
在Pypark中有没有同样的方法?
这将生成Dataframe:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5), 
         ('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)

nData = [('a', 'b', 1),
         ('c', 'd', 6),
         ('e', 'f', 7),
         ('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)

在pyspark中,几乎包含了sql中的所有内容。但我找不到与之相当的 MERGE .

drkbr07n

drkbr07n1#

在sql中, MERGE 可替换为左联接右联接<=>完全外部联接:

merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
    .selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")

merged.show()

# +---+---+----+

# | c1| c2|  d1|

# +---+---+----+

# |  e|  f|   7|

# |  g|  h|null|

# |  c|  d|   6|

# |  a|  b|   5|

# +---+---+----+

但是,每次执行此合并时,都需要将所有数据重写到目标中,因为spark不支持更新,这可能会导致性能下降。因此,如果您真的需要这样做,我建议您看看delta-lake,它将acid事务引入spark,并支持合并语法。

afdcj2ne

afdcj2ne2#

可以使用左连接和合并列 coalesce ```
import pyspark.sql.functions as F

result = new_data.alias('t1').join(
destination.alias('t2'),
['c1', 'c2'],
'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))

result.show()
+---+---+----+
| c1| c2| d1|
+---+---+----+
| e| f| 7|
| g| h|null|
| c| d| 6|
| a| b| 5|
+---+---+----+

相关问题