我正在尝试使用sparksqldataframes和jdbc连接在mysql上插入和更新一些数据。
我使用savemode.append成功地插入了新数据。有没有办法从sparksql更新mysql表中已有的数据?
我要插入的代码是: myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)
如果我改为savemode.overwrite,它会删除完整的表并创建一个新表,我会在mysql中寻找类似“on duplicate key update”的内容
6条答案
按热度按时间z9smfwbn1#
在pyspark中,我无法做到这一点,所以我决定使用odbc。
ttygqcqt2#
zero323的答案是正确的,我只想补充一点,您可以使用jaydebeapi包来解决这个问题:https://pypi.python.org/pypi/jaydebeapi/
更新mysql表中的数据。由于您已经安装了mysql jdbc驱动程序,所以这可能是一个悬而未决的成果。
jaydebeapi模块允许您使用javajdbc从python代码连接到数据库。它为该数据库提供了pythondbapiv2.0。
我们使用python的anaconda发行版,jaydebeapi python包是标准的。
参见上面链接中的示例。
ss2ws0br3#
可惜没有
SaveMode.Upsert
在Spark模式这类相当常见的情况下,如上升。zero322总体上是正确的,但我认为应该有可能(在性能上有所妥协)提供这样的替换特性。
我还想为这个案例提供一些java代码。当然,它的性能不如spark的内置性能,但它应该是满足您需求的良好基础。只需根据您的需要进行修改:
deyfvvtc4#
如果您的表很小,那么您可以读取sql数据并在spark dataframe中进行升级。并覆盖现有的sql表。
ttcibm8c5#
这是不可能的。至于现在(spark 1.6.0/2.2.0快照)spark
DataFrameWriter
仅支持四种写入模式:SaveMode.Overwrite
:覆盖现有数据。SaveMode.Append
:追加数据。SaveMode.Ignore
:忽略操作(即无操作)。SaveMode.ErrorIfExists
:default选项,在运行时引发异常。您可以手动插入,例如使用
mapPartitions
(因为您希望upsert操作应该是幂等的并且易于实现),所以可以写入临时表并手动执行upsert,或者使用触发器。一般来说,为批处理操作实现upsert行为并保持良好的性能绝非易事。您必须记住,一般情况下会有多个并发事务(每个分区一个事务),因此您必须确保没有写冲突(通常使用特定于应用程序的分区)或提供适当的恢复过程。在实践中,对临时表执行批写操作并直接在数据库中解析upsert部分可能更好。
35g0bw716#
覆盖
org.apache.spark.sql.execution.datasources.jdbc
JdbcUtils.scalainsert into
至replace into
```import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, SQLException}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import com.typesafe.scalalogging.Logger
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, DriverWrapper, JDBCOptions}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
/**
*/
object UpdateJdbcUtils {
val logger = Logger(this.getClass)
/**
* Returns a factory for creating connections to the given JDBC URL.
*
* @param options - JDBC options that contains url, table and other information.
*/
def createConnectionFactory(options: JDBCOptions): () => Connection = {
val driverClass: String = options.driverClass
() => {
DriverRegistry.register(driverClass)
val driver: Driver = DriverManager.getDrivers.asScala.collectFirst {
case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == driverClass => d
case d if d.getClass.getCanonicalName == driverClass => d
}.getOrElse {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
driver.connect(options.url, options.asConnectionProperties)
}
}
/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
def insertStatement(conn: Connection, table: String, rddSchema: StructType, dialect: JdbcDialect)
: PreparedStatement = {
val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"REPLACE INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
/**
* Retrieve standard jdbc types.
*
* @param dt The datatype (e.g. org.apache.spark.sql.types.StringType)
* @return The default JdbcType for this DataType
*/
def getCommonJDBCType(dt: DataType): Option[JdbcType] = {
dt match {
case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
case t: DecimalType => Option(
JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => None
}
}
private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.simpleString}"))
}
// A
JDBCValueGetter
is responsible for getting a value fromResultSet
into a field// for
MutableRow
. The last argumentInt
means the index for the value to be set in// the row and also used for the value in
ResultSet
.private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
// A
JDBCValueSetter
is responsible for setting a value fromRow
into a field for//
PreparedStatement
. The last argumentInt
means the index for the value to be set// in the SQL statement and also used for the value in
Row
.private type JDBCValueSetter = (PreparedStatement, Row, Int) => Unit
/**
* Saves a partition of a DataFrame to the JDBC database. This is done in
* a single database transaction (unless isolation level is "NONE")
* in order to avoid repeatedly inserting data as much as possible.
*
* It is still theoretically possible for rows in a DataFrame to be
* inserted into the database more than once if a stage somehow fails after
* the commit occurs but before the stage can return successfully.
*
* This is not a closure inside saveTable() because apparently cosmetic
* implementation changes elsewhere might easily render such a closure
* non-Serializable. Instead, we explicitly close over all variables that
* are used.
*/
def savePartition(
getConnection: () => Connection,
table: String,
iterator: Iterator[Row],
rddSchema: StructType,
nullTypes: Array[Int],
batchSize: Int,
dialect: JdbcDialect,
isolationLevel: Int): Iterator[Byte] = {
val conn = getConnection()
var committed = false
}
/**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
url: String,
table: String,
options: JDBCOptions) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
}
}
private def makeSetter(
conn: Connection,
dialect: JdbcDialect,
dataType: DataType): JDBCValueSetter = dataType match {
case IntegerType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getInt(pos))
}
}
val url = s"jdbc:mysql://$host/$database?useUnicode=true&characterEncoding=UTF-8"
val parameters: Map[String, String] = Map(
"url" -> url,
"dbtable" -> table,
"driver" -> "com.mysql.jdbc.Driver",
"numPartitions" -> numPartitions.toString,
"user" -> user,
"password" -> password
)
val options = new JDBCOptions(parameters)
for (d <- data) {
UpdateJdbcUtils.saveTable(d, url, table, options)
}