这是一个ETL作业。数据的转换在Pyspark中的Databricks中编写,最终数据加载到Azure SQL表中。假设我昨天在决赛中有100条记录,今天有50条记录。如果这50条记录与昨天的记录重复,我将遇到主键冲突问题,并且在数据库中的写入操作将失败。
因此,如果记录已经存在,我必须更新最新的值,否则插入新的记录。
我检查的所有资源都提到了执行合并操作,但实际情况是我必须覆盖整个表,并且不能直接在Azure SQL表中更新/替换一条记录。
有没有办法一次只更新一条记录,而不更新整个表?因为最终的表将不断增长,每天覆盖是有风险和耗时的。
我们可以使用存储过程并从数据库中调用它吗?
2条答案
按热度按时间nzk0hqpo1#
您应该能够通过在proc中指定您对update语句的需求并使用pyodbc从数据块运行它来简单地完成它。但是你必须知道你要更新哪个ID,或者以某种方式能够根据某些条件对其进行参数化。
这就是你的更新。
在数据砖中:
或者你可以像这样做最后一部分:
然后关闭连接,这应该工作,但我还没有测试现在。这通常是你从数据块调用proc的方式。
mrwjdhj32#
根据@Ziya的上述回答,下面是我如何使用pyodbc实现的。
安装pyodbc:
参考https://pub.towardsai.net/databricks-upsert-to-azure-sql-using-pyspark-5937e8303fbf
将合并写入查询:
执行上述操作: