我有一个模式进化的案例。
详细描述:
我正在使用自动加载器和foreachbatch将源表从datalake加载到bronze层作为行数据,并具有合并到statemenet的功能。
在从bronze层移动到sivler层时,作为源表,我应用select
语句来过滤掉移动到银层时的额外列。
我只有一张table。
在青铜层的表customeraddress
有列MSFT_DATASTATE
,这是不相同的表在银层的情况。所以我想自动添加此列到我的银表。
# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column} and '
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# print("printing " + cdm + " columns")
print("We at this stage now -----------------------------------------------------")
# print(dfUpdates.columns)
deltaTable = DeltaTable.forPath(spark, f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"), string) \
.whenMatchedUpdate(set=dictionary) \
.whenNotMatchedInsert(values=dictionary) \
.execute()
df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
字符串
Error Im getting指出:
SET column not found given columns:[
PK_D365_customeraddress,
IsDelete,etc] 这是正确的,
MSFT_DATASTATE`列不在我的银deltatable。
1条答案
按热度按时间qni6mghb1#
Ref:https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html尝试并使用此:
字符串