我们正在使用delta(.io)作为我们的数据湖。每隔X小时,我们希望更新所有新的/更改的记录。
我们的初始代码如下所示:
from delta.tables import *
for table in output_tables.keys():
update_condition = "old." + output_table_unique_keys[table] + " = new." + output_table_unique_keys[table]
new_df = output_tables[table].drop_duplicates([output_table_unique_keys[table]])
old_df = DeltaTable.forPath(spark, des_file_path + table)
old_df.alias("old").merge(new_df.alias("new"), update_condition) \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll() \
.execute()
目前update_condition
是old.unique_key = new.unique_key
,但是我们遇到了一些错误,我们倾向于ConcurrentAppendException
,参见https://docs.databricks.com/delta/concurrency-control.html。
该示例向我们展示了此代码作为解决方案
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
为了实现这个解决方案,我们需要使update_condition
更加明确,我们想通过添加以下语句(我们的分区)来实现。old.unique_key = new.unique_key AND new.Division = <Division> AND new.company = <company>
就像例子中的scala代码一样(来自链接),我们在<>
中有new语句的值,在Python中如何工作?我如何动态地用每一行的值替换那个值?
1条答案
按热度按时间lvjbypge1#
这个解决办法可以根据你的情况来推断。
首先我创建了一个日期变量
我使用下面的代码输入这个变量作为更新增量表的条件。
正如你所看到的,条件可以根据当前日期进行更新,以改变另一列,在本例中是性别列。