如何执行从pyspark框架到azure sql数据库表的upsert(插入+更新)?

xqkwcwgp  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(199)

我正在尝试做一个从pyspark数组到sql表的upsert。
sparkdf是我的pyspark框架。Test是我在azure sql数据库中的sql表。
到目前为止,我有以下内容:

  1. def write_to_sqldatabase(final_table, target_table):
  2. #Write table data into a spark dataframe
  3. final_table.write.format("jdbc") \
  4. .option("url", f"jdbc:sqlserver://{SERVER};databaseName={DATABASE}") \
  5. .option("dbtable", f'....{target_table}') \
  6. .option("user", USERNAME) \
  7. .option("password", PASSWORD) \
  8. .mode("append") \
  9. .save()

字符串

  1. spark.sql("""
  2. merge target t
  3. using source s
  4. on s.Id = t.Id
  5. when matched then
  6. update set *
  7. when not matched then insert *
  8. """)


  1. jdbc_url = f"jdbc:sqlserver://{SERVER};database={DATABASE};user={USERNAME};password={PASSWORD}"
  2. sparkdf.createOrReplaceTempView('source')
  3. df = spark.read \
  4. .format("jdbc") \
  5. .option("url", jdbc_url) \
  6. .option("dbtable", "(merge into target t using source s on s.Id = t.Id when matched then update set * when not matched then insert *) AS subquery") \
  7. .load()


后者不起作用,因为azure sql server似乎不支持 *。我想你必须声明列和值而不使用 *。但是我想动态地这样做,因为我有很多列和很多不同的表,我想为它们做upsert。
我尝试了不同的选择,但到目前为止没有任何效果。

yiytaume

yiytaume1#

根据这个https://issues.apache.org/jira/browse/SPARK-19335 Spark框架编写器API没有这样的jdbc功能。必须推出自己的解决方案。

相关问题