我正在尝试做一个从pyspark数组到sql表的upsert。
sparkdf是我的pyspark框架。Test是我在azure sql数据库中的sql表。
到目前为止,我有以下内容:
def write_to_sqldatabase(final_table, target_table):
#Write table data into a spark dataframe
final_table.write.format("jdbc") \
.option("url", f"jdbc:sqlserver://{SERVER};databaseName={DATABASE}") \
.option("dbtable", f'....{target_table}') \
.option("user", USERNAME) \
.option("password", PASSWORD) \
.mode("append") \
.save()
字符串
和
spark.sql("""
merge target t
using source s
on s.Id = t.Id
when matched then
update set *
when not matched then insert *
""")
型
和
jdbc_url = f"jdbc:sqlserver://{SERVER};database={DATABASE};user={USERNAME};password={PASSWORD}"
sparkdf.createOrReplaceTempView('source')
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "(merge into target t using source s on s.Id = t.Id when matched then update set * when not matched then insert *) AS subquery") \
.load()
型
后者不起作用,因为azure sql server似乎不支持 *。我想你必须声明列和值而不使用 *。但是我想动态地这样做,因为我有很多列和很多不同的表,我想为它们做upsert。
我尝试了不同的选择,但到目前为止没有任何效果。
1条答案
按热度按时间yiytaume1#
根据这个https://issues.apache.org/jira/browse/SPARK-19335 Spark框架编写器API没有这样的jdbc功能。必须推出自己的解决方案。