pyspark Spark jdbc覆盖模式未按预期工作

nbnkbykc  于 12个月前  发布在  Spark
关注(0)|答案(5)|浏览(146)

我想使用spark执行更新和插入操作,请查找现有表的图像引用

这里我更新id:101 location和inserttime并插入2条记录:

并使用模式覆盖写入目标

df.write.format("jdbc")
  .option("url",  "jdbc:mysql://localhost/test")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable","temptgtUpdate")
  .option("user", "root")
  .option("password", "root")
  .option("truncate","true")
  .mode("overwrite")
  .save()

执行上述命令后,我的数据被损坏,插入到数据库表

数据库中的数据

你能告诉我你的意见和解决办法吗

toiithl6

toiithl61#

Spark JDBC writer支持以下模式:

由于您使用的是“覆盖”模式,它会根据列长度重新创建表,如果您想要自己的表定义,请先创建表,然后使用“追加”模式

m1m5dgzv

m1m5dgzv2#

我想使用spark执行更新和插入操作
在Spark SQL中没有与SQL UPDATE语句等效的语句。Spark SQL中也没有与SQL DELETE WHERE语句等效的语句。相反,您必须在Spark外部删除需要更新的行,然后使用append模式将包含新记录和更新记录的Spark框架写入表中(以便保留表中剩余的现有行)。

xxe27gdn

xxe27gdn3#

如果你需要在你的pyspark代码中执行UPSERT / DELETE操作,我建议你使用pymysql库,并执行你的upsert/delete操作。请查看这篇文章以获取更多信息,并参考代码示例:Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array
请根据您的需要修改代码示例。

3b6akqbq

3b6akqbq4#

我不推荐TRUNCATE,因为它实际上会删除表,并创建新表。在这样做的时候,表可能会丢失之前设置的列级属性.所以在使用TRUNCATE时要小心,并且要确定是否可以删除表/重新创建表。

mzaanser

mzaanser5#

按照以下步骤操作时,Upsert逻辑工作正常

df = (spark.read.format("csv").
        load("file:///C:/Users/test/Desktop/temp1/temp1.csv", header=True,
             delimiter=','))

做这部

(df.write.format("jdbc").
    option("url", "jdbc:mysql://localhost/test").
    option("driver", "com.mysql.jdbc.Driver").
    option("dbtable", "temptgtUpdate").
    option("user", "root").
    option("password", "root").
    option("truncate", "true").
    mode("overwrite").save())

尽管如此,我还是无法理解为什么当我直接使用 Dataframe 编写时它会失败

相关问题