使用R的数据块增量表合并语句

kuarbcqp  于 2022-12-06  发布在  其他
关注(0)|答案(2)|浏览(131)

我最近开始研究Databricks,我一直在尝试找到一种方法在Delta表上执行merge语句,尽管使用了R api(最好是sparklyr)。最终的目的是以某种方式施加一个“duplicate”约束,如这里所述。前面提到的文档描述了Python的工作流程:

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

然而,我想知道是否有一个直接的方法来实现这一点,通过R。任何援助/想法的问题将不胜感激,因为我是一个新的用户(如上所述)。提前感谢!

e4yzc0pl

e4yzc0pl1#

提供此答案是因为您评论说没有R Delta Lake API支持。现在有一个新的R包为Delta Lake提供了R API:dlt。语法与用于Delta Lake的Python API非常相似。
在您的示例中:

# Install and laod the `dlt` package
remotes::install_gitlab("zero323/dlt")
library(dlt)
...

# Use the Delta Lake R API from the dlt package
deltaTable <- dlt_for_path("<path to table>")

deltaTable %>%
  dlt_alias("logs") %>%
  dlt_merge(alias(newDedupedLogs, "newDedupedLogs"), expr("newDedupedLogs.uniqueId = logs.uniqueId")) %>%
  dlt_when_not_matched_insert_all() %>%
  dlt_execute()
ygya80vv

ygya80vv2#

Delta Lake没有官方的R API,但可以使用SQL的MERGE INTO命令(我经常在Scala/Python中使用,因为它更容易阅读,至少对我来说是这样)。需要注册一个临时视图,其中包含要放入目标表的数据,然后运行sql,如下所示(字符串被拆分以便于阅读):

library(SparkR)
updates_df <- ...get updates...
createOrReplaceTempView(updates, "updates")
result <- sql(
  "MERGE INTO <your_table> AS target USING updates 
   ON target.id = updates.id WHEN NOT MATCHED THEN INSERT *")

相关问题