我正在使用sparkjdbc保存数据库中的行。数据的保存工作正常。
问题:如果spark遇到任何错误记录(例如,当表需要非空值时,列具有空值),它将中止保存
我想要的是:我想要spark忽略坏的行并继续保存下一行。如何做到这一点?我在文档中看不到太多。使用 StructType
不是一个选择。
有指针吗?
我的代码是这样的。
class DatabaseWriter {
def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession): Unit = {
val dbProperties = getSQLProperties(sparkSession, configurationProp)
dataFrameTobeWritten.write.mode(SaveMode.Overwrite)
.option("driver", dbProperties.driverName)
.option("truncate", "true")
.option("batchsize", configurationProp.WriterBatchSize())
.jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
}
}
1条答案
按热度按时间i7uq4tfw1#
在方法中添加非空列的列表,并使用它们创建筛选条件以筛选出坏行