我用python编写了一个airbyte自定义目的地。我已经使用此查询实现了增量同步重复数据消除操作,
MERGE INTO {self.schema_name}.{table_name} AS target
USING {self.schema_name}.{table_name}_temp AS source
ON target.data_{primary_keys[0][0]}=source.data_{primary_keys[0][0]}
WHEN MATCHED THEN
{query_placeholder_refined}
WHEN NOT MATCHED THEN
INSERT *
这里,query_placeholder_refined
变量被UPDATE SET
查询语句替换,其中目标表的所有列分别被更新,例如采用该查询的简化版本,
MERGE INTO integration.issues as target
USING integration.issues_temp as source
ON target.data_id=source.data_id
WHEN MATCHED THEN
UPDATE SET target.data_issue_url=source.data_issue_url, target.data_user_id=source.data_user_id
WHEN NOT MATCHED THEN
INSERT *
查询在一些流上运行得很好,但对于其他流,它给出了这个错误,pyspark.sql.utils.AnalysisException: Updates are in conflict for these columns: data_user_id
1条答案
按热度按时间46scxncf1#
我找到了解决方案,查询是完美的,工作起来很有魅力,问题是我没有在查询的
UPDATE SET
行中给出正确的列,这就是为什么SQL抛出上述错误,错误基本上转换为,它无法在表中找到给定的列名。方案一
使用
UPDATE SET *
更改列名方案二
为查询提供正确的列名。