我想使用spark执行更新和插入操作请查找现有表的图像引用
![ ]1
在这里,我更新id:101位置和插入时间,并插入两个以上的记录
并以覆盖模式写入目标
df.write.format("jdbc")
.option("url", "jdbc:mysql://localhost/test")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","temptgtUpdate")
.option("user", "root")
.option("password", "root")
.option("truncate","true")
.mode("overwrite")
.save()
执行上述命令后,我的数据被损坏,并被插入到db表中
Dataframe中的数据
你能告诉我你的意见和解决办法吗
5条答案
按热度按时间lx0bsm1f1#
如果您需要在pyspark代码中执行upsert/delete操作,我建议您使用pymysql库,并执行upsert/delete操作。请查看这篇文章以获取更多信息,代码示例可供参考:在重复键上使用insert into table时出错,使用for循环数组
请根据您的需要修改代码示例。
7tofc5zh2#
spark jdbc writer支持以下模式:
追加:追加此的内容:class:dataframe to 现有数据。
覆盖:覆盖现有数据。
忽略:如果数据已经存在,则自动忽略此操作。
错误(默认情况):如果数据已经存在,则抛出异常
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
因为您使用的是“overwrite”模式,所以它会根据列长度重新创建表,如果您希望自己的表定义,请先创建表,然后使用“append”模式
webghufk3#
我想使用spark执行更新和插入操作
在sql中没有等价物
UPDATE
带有spark sql的语句。也没有一种与sql等价的方法DELETE WHERE
带有spark sql的语句。相反,您必须删除spark之外需要更新的行,然后使用将包含新记录和更新记录的sparkDataframe写入表中append
模式(以保留表中剩余的现有行)。qzwqbdag4#
当遵循以下步骤时,upsert逻辑工作正常
df.coalesce(1).write.format(“csv”).save(“文件:///c:/users/test/desktop/temp1”,header=true)df=spark.read.format(“csv”).load(“文件:///c:/users/test/desktop/temp1/temp1.csv”,header=true,delimiter=',')
这么做呢
df.write.format(“jdbc”).option(“url”,”jdbc:mysql用法:/localhost/test“).option(“driver”,“com.mysql.jdbc.driver”).option(“dbtable”,“testgtupdate”).option(“user”,“root”).option(“password”,“root”).option(“truncate”,“true”).mode(“overwrite”).save()
但我仍然无法理解当我直接使用Dataframe进行编写时它失败的逻辑
liwlm1x95#
我不建议使用truncate,因为它实际上会删除表并创建新表。执行此操作时,表可能会丢失先前设置的列级属性…因此在使用truncate时要小心,并确保是否可以删除表/重新创建表。