如何用spark删除数据库中的行?

b09cbbtk  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(790)

谢谢你阅读这个问题。
我知道如何插入行

df.write \
        .format('jdbc') \
        .option("url", url) \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "org.postgresql.Driver") \
        .mode('append') \
        .save()

但是如何删除行呢?就像。。

df = [Row(id=1), Row(id=2), ... ]

=> DELETE FROM TABLE WHERE id in df ...

有可能吗?

ygya80vv

ygya80vv1#

spark不支持。但是我已经用foreachpartition完成了(只使用dataframe数据…)
apachesparksql是否支持merge子句?

df.rdd.coalesce(2).foreachPartition(partition => {
  val connectionProperties = brConnect.value
  val jdbcUrl = connectionProperties.getProperty("jdbcurl")
  val user = connectionProperties.getProperty("user")
  val password = connectionProperties.getProperty("password")
  val driver = connectionProperties.getProperty("Driver")
  Class.forName(driver)
  val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
  val db_batchsize = 1000
  var pstmt: PreparedStatement = null
  partition.grouped(db_batchsize).foreach(batch => {
    batch.foreach{ row =>
      {
        val id = row.id
        val fname = row.fname
        val lname = row.lname
        val userid = row.userid
        println(id, fname)
        val sqlString = "INSERT employee USING   " +
        " values (?, ?, ?, ?) "

        var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
        pstmt.setLong(1, row.id)
        pstmt.setString(2, row.fname)
        pstmt.setString(3, row.lname)
        pstmt.setString(4, row.userid)
        pstmt.addBatch()
        pstmt.executeBatch()
      }
    }
    //pstmt.executeBatch()
    dbc.commit()
    pstmt.close()
  })
  dbc.close()
})

相关问题