如何在python中动态传递变量给增量表updateAll()?

xpcnnkqh  于 2023-02-14  发布在  Python
关注(0)|答案(1)|浏览(132)

我们正在使用delta(.io)作为我们的数据湖。每隔X小时,我们希望更新所有新的/更改的记录。
我们的初始代码如下所示:

from delta.tables import *

for table in output_tables.keys():
  update_condition = "old." + output_table_unique_keys[table] + " = new." + output_table_unique_keys[table]
  new_df = output_tables[table].drop_duplicates([output_table_unique_keys[table]])
  old_df = DeltaTable.forPath(spark, des_file_path + table)

  old_df.alias("old").merge(new_df.alias("new"), update_condition) \
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll() \
  .execute()

目前update_conditionold.unique_key = new.unique_key,但是我们遇到了一些错误,我们倾向于ConcurrentAppendException,参见https://docs.databricks.com/delta/concurrency-control.html
该示例向我们展示了此代码作为解决方案

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

为了实现这个解决方案,我们需要使update_condition更加明确,我们想通过添加以下语句(我们的分区)来实现。
old.unique_key = new.unique_key AND new.Division = <Division> AND new.company = <company>
就像例子中的scala代码一样(来自链接),我们在<>中有new语句的值,在Python中如何工作?我如何动态地用每一行的值替换那个值?

lvjbypge

lvjbypge1#

这个解决办法可以根据你的情况来推断。
首先我创建了一个日期变量

#Update active values
active_date = datetime.today() + relativedelta.relativedelta(months=0, day=1)
active_date = active_date.strftime('%Y-%m-%d')
print(active_date)

我使用下面的代码输入这个变量作为更新增量表的条件。

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "startDate != " + active_date,
  set = { "gender": "'Female'" }
)

正如你所看到的,条件可以根据当前日期进行更新,以改变另一列,在本例中是性别列。

相关问题