pyspark 使用Autoloader进行模式演化

dzhpxtsq  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(148)

我有一个模式进化的案例。
详细描述:
我正在使用自动加载器和foreachbatch将源表从datalake加载到bronze层作为行数据,并具有合并到statemenet的功能。
在从bronze层移动到sivler层时,作为源表,我应用select语句来过滤掉移动到银层时的额外列。
我只有一张table。
在青铜层的表customeraddress有列MSFT_DATASTATE,这是不相同的表在银层的情况。所以我想自动添加此列到我的银表。

  1. # Enable autoMerge for schema evolution
  2. spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
  3. p = re.compile('^BK_')
  4. list_of_columns = dfUpdates.columns
  5. list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
  6. string = ''
  7. for column in list_of_BK_columns:
  8. string += f'table.{column} = newData.{column} and '
  9. dictionary = {}
  10. for key in list_of_columns:
  11. dictionary[key] = f'newData.{key}'
  12. # print("printing " + cdm + " columns")
  13. print("We at this stage now -----------------------------------------------------")
  14. # print(dfUpdates.columns)
  15. deltaTable = DeltaTable.forPath(spark, f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao")
  16. deltaTable.alias('table') \
  17. .merge(dfUpdates.alias("newData"), string) \
  18. .whenMatchedUpdate(set=dictionary) \
  19. .whenNotMatchedInsert(values=dictionary) \
  20. .execute()
  21. df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

字符串
Error Im getting指出:
SET column not found given columns:[PK_D365_customeraddressIsDelete,etc] 这是正确的,MSFT_DATASTATE`列不在我的银deltatable。

qni6mghb

qni6mghb1#

Ref:https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html尝试并使用此:

  1. # Add the mergeSchema option
  2. loans.write.format("delta") \
  3. .option("mergeSchema", "true") \
  4. .mode("append") \
  5. .save(DELTALAKE_SILVER_PATH)

字符串

相关问题