我正在AWS胶水作业中使用以下代码创建一个Iceberg表:
df.writeTo(f'glue_catalog.{DATABASE_NAME}.{TABLE_NAME}') \
.using('iceberg') \
.tableProperty("location", TABLE_LOCATION) \
.tableProperty("write.spark.accept-any-schema", "true") \
.tableProperty("format-version", "2") \
.createOrReplace()
字符串
创建了表,我可以在Glue/LF中看到它,我可以在Athena中查询它。
我有另一个工作,正在尝试使用以下方法来upsert数据:
df_upsert.createOrReplaceTempView("upsert_items")
upsert_query = f"""
MERGE INTO glue_catalog.{DATABASE_NAME}.{TABLE_NAME} target
USING (SELECT * FROM upsert_items) updates
ON {join_condidtion}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)
型
The Ghosts Job fails and says:
AnalysisException: cannot resolve my_column in MERGE command given columns [updates.col1, updates.col2, ...
型
当列可能丢失或列可能增加时,我如何合并新数据。我认为Iceberg将通过为丢失/新列填充NULL来处理这个问题,因为我设置了“write.spark.accept-any-schema”= true。谢谢。
运行Spark版本3.3.0-amzn-1
AWS Gandroid Job v4
冰山v1.0.0
1条答案
按热度按时间wwtsj6pe1#
根据文件:
编写器必须启用mergeSchema选项。
第一个月
这在目前的
spark.sql("MERGE ...")
中是无法实现的。有一个开放的“功能请求”issue来处理这个问题。
一个“非最佳”解决方案是检测是否在源中找到列而在目标中尚未找到,然后在MERGE语句之前执行do和
ALTER TABLE target ADD COLUMN
。️️🤷️🤷🤷