pyspark Delta Lake合并不更新模式(启用自动模式演变)

omjgkv6w  于 2024-01-06  发布在  Spark
关注(0)|答案(5)|浏览(206)

当我执行下面的代码行时,我得到一个错误:

  1. deltaTarget.alias('target').merge(df.alias('source'), mergeStatement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

字符串
错误如下:
分析异常:无法解析UPDATE子句中给定列{List of target columns}的new_column。“new_column”确实不在目标增量表的模式中,但根据文档,这应该只是更新增量表的现有模式并添加列。
我还使用以下命令启用autoMerge:

  1. spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")


我不确定到底是什么原因导致了这个错误,因为在过去,我能够用这些精确的代码自动进化增量表的模式。
是不是有什么我忽略了?

mpbci0fu

mpbci0fu1#

必须删除spark.conf.set..autoMerge.enabled之后的空格
--> It's

  1. spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")

字符串
不是

  1. spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")`

polkgigr

polkgigr2#

我也有同样的问题,但我发现在delta lake docs中,它可能不太支持upsertAll()和insertAll()的部分列;所以我选择了upsertExpr()和insertExpr(),其中一个大map包含所有列。
delta lake merge : Schema validation

5gfr0r5j

5gfr0r5j3#

如果我没有弄错的话,您需要在MERGE操作中使用insertAll或updateAll选项

pinkon5k

pinkon5k4#

spark.conf.set(“spark.databricks.delta.schema.autoMerge.enabled”,“true”)
确保在上面一行中的“enabled”之后没有空格。
然后你可以使用pass a spark sql:

  1. spark.sql(f"""
  2. MERGE INTO {data_path} delta USING global_temp.src source
  3. ON delta.col1 = source.key1
  4. AND delta.col2 = source.key2
  5. WHEN MATCHED THEN
  6. UPDATE SET *
  7. WHEN NOT MATCHED THEN
  8. INSERT *
  9. """)

字符串

展开查看全部
pnwntuvh

pnwntuvh5#

  1. deltaTable.as("target")
  2. .merge(data.as("source"),
  3. "source.internalUUID = target.internalUUID")
  4. .whenMatched().updateAll()
  5. .whenNotMatched().insertAll()
  6. .execute();

字符串

相关问题