pyspark 如何在Azure SQL表上从Azure数据块执行合并到/upsert

cpjpxq1n  于 12个月前  发布在  Spark
关注(0)|答案(2)|浏览(124)

这是一个ETL作业。数据的转换在Pyspark中的Databricks中编写,最终数据加载到Azure SQL表中。假设我昨天在决赛中有100条记录,今天有50条记录。如果这50条记录与昨天的记录重复,我将遇到主键冲突问题,并且在数据库中的写入操作将失败。
因此,如果记录已经存在,我必须更新最新的值,否则插入新的记录。
我检查的所有资源都提到了执行合并操作,但实际情况是我必须覆盖整个表,并且不能直接在Azure SQL表中更新/替换一条记录。
有没有办法一次只更新一条记录,而不更新整个表?因为最终的表将不断增长,每天覆盖是有风险和耗时的。
我们可以使用存储过程并从数据库中调用它吗?

nzk0hqpo

nzk0hqpo1#

您应该能够通过在proc中指定您对update语句的需求并使用pyodbc从数据块运行它来简单地完成它。但是你必须知道你要更新哪个ID,或者以某种方式能够根据某些条件对其进行参数化。

UPDATE Table1
SET Name = 'Leet'
WHERE ID = 1337;

这就是你的更新。
在数据砖中:

import pyodbc

server = 'server.database.windows.net'
database = 'database'
username = 'username'
password = 'password'
conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'

conn = pyodbc.connect(conn_str) 
cursor = conn.cursor()

#Give your params here
id = 1337
new_name = 'Leet'

update_query = f"UPDATE Table1 SET Name = '{new_name}' WHERE ID = {id}"
cursor.execute(update_query, new_name, id)
conn.commit()

或者你可以像这样做最后一部分:

update_query = "UPDATE YourTable SET Name = ? WHERE ID = ?"
cursor.execute(update_query, new_name, id)

然后关闭连接,这应该工作,但我还没有测试现在。这通常是你从数据块调用proc的方式。

mrwjdhj3

mrwjdhj32#

根据@Ziya的上述回答,下面是我如何使用pyodbc实现的。
安装pyodbc:
参考https://pub.towardsai.net/databricks-upsert-to-azure-sql-using-pyspark-5937e8303fbf

%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list 
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev
sudo apt-get install python3-pip -y
pip3 install --upgrade pyodbc

将合并写入查询:

upsert_query = f"""MERGE INTO APD_TEMP.TARGET_TBL AS Target USING APD_TEMP.SRC_TBL AS Source
ON Target.SrcCode = Source.SrcCode
   AND Target.Date = Source.Date
   AND Target.CC = Source.CC
   AND Target.ON = Source.ON
   AND Target.Country = Source.Country
   AND Target.CatCode = Source.CatCode
   AND Target.IDNo = Source.IDNo
   AND Target.InfoCode = Source.InfoCode
WHEN MATCHED THEN
   UPDATE
   SET Target.Time = Source.Time,
       Target.Qty = Source.Qty,
       Target.ValueTxt = Source.ValueTxt,
       Target.UpdateTimestamp = Source.UpdateTimestamp
WHEN NOT MATCHED THEN
   INSERT (SrcCode, Date, CC, ON, Country, CatCode, IDNo, Time, Qty, ValueTxt, CreationTimestamp, UpdateTimestamp, InfoCode)
      VALUES (Source.SrcCode, Source.Date, Source.CC,
    Source.ON, Source.Country, Source.CatCode,
    Source.IDNo,
    Source.Time, Source.Qty,
    Source.ValueTxt, Source.CreationTimestamp, Source.UpdateTimestamp, Source.InfoCode);"""

执行上述操作:

cursor.execute(upsert_query)
conn.commit()

相关问题