spark jdbc writemode overwrite未按预期工作

ccrfmcuu  于 2021-05-27  发布在  Spark
关注(0)|答案(5)|浏览(545)

我想使用spark执行更新和插入操作请查找现有表的图像引用
![ ]1
在这里,我更新id:101位置和插入时间,并插入两个以上的记录

并以覆盖模式写入目标

df.write.format("jdbc")
  .option("url",  "jdbc:mysql://localhost/test")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable","temptgtUpdate")
  .option("user", "root")
  .option("password", "root")
  .option("truncate","true")
  .mode("overwrite")
  .save()

执行上述命令后,我的数据被损坏,并被插入到db表中

Dataframe中的数据

你能告诉我你的意见和解决办法吗

lx0bsm1f

lx0bsm1f1#

如果您需要在pyspark代码中执行upsert/delete操作,我建议您使用pymysql库,并执行upsert/delete操作。请查看这篇文章以获取更多信息,代码示例可供参考:在重复键上使用insert into table时出错,使用for循环数组
请根据您的需要修改代码示例。

7tofc5zh

7tofc5zh2#

spark jdbc writer支持以下模式:
追加:追加此的内容:class:dataframe to 现有数据。
覆盖:覆盖现有数据。
忽略:如果数据已经存在,则自动忽略此操作。
错误(默认情况):如果数据已经存在,则抛出异常
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
因为您使用的是“overwrite”模式,所以它会根据列长度重新创建表,如果您希望自己的表定义,请先创建表,然后使用“append”模式

webghufk

webghufk3#

我想使用spark执行更新和插入操作
在sql中没有等价物 UPDATE 带有spark sql的语句。也没有一种与sql等价的方法 DELETE WHERE 带有spark sql的语句。相反,您必须删除spark之外需要更新的行,然后使用将包含新记录和更新记录的sparkDataframe写入表中 append 模式(以保留表中剩余的现有行)。

qzwqbdag

qzwqbdag4#

当遵循以下步骤时,upsert逻辑工作正常
df.coalesce(1).write.format(“csv”).save(“文件:///c:/users/test/desktop/temp1”,header=true)df=spark.read.format(“csv”).load(“文件:///c:/users/test/desktop/temp1/temp1.csv”,header=true,delimiter=',')
这么做呢
df.write.format(“jdbc”).option(“url”,”jdbc:mysql用法:/localhost/test“).option(“driver”,“com.mysql.jdbc.driver”).option(“dbtable”,“testgtupdate”).option(“user”,“root”).option(“password”,“root”).option(“truncate”,“true”).mode(“overwrite”).save()
但我仍然无法理解当我直接使用Dataframe进行编写时它失败的逻辑

liwlm1x9

liwlm1x95#

我不建议使用truncate,因为它实际上会删除表并创建新表。执行此操作时,表可能会丢失先前设置的列级属性…因此在使用truncate时要小心,并确保是否可以删除表/重新创建表。

相关问题