我正在尝试向azure blob存储中作为增量表存储的数据添加一个新列。对数据执行的大多数操作都是upserts,有很多更新,很少有新的插入。我当前编写数据的代码如下所示:
DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
从这些文档来看,delta lake似乎支持在 insertAll()
以及 updateAll()
只打电话。但是,我只在满足某些条件并希望将新列添加到所有现有数据(默认值为)时更新 null
).
我想出了一个看起来非常笨拙的解决方案,我想知道是否有一个更优雅的方法。以下是我目前提出的解决方案:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)
2条答案
按热度按时间dzhpxtsq1#
首先更改增量表,然后执行合并操作:
gab6jxml2#
你试过使用merge语句吗?
https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html