pyspark 增量合并不更新架构- autoMerge.enabled

q9yhzks0  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(169)

我的delta表在更新模式和推断其中的新列时出现了一些问题。只有一个表出现了问题。表的其余部分工作正常。下面的代码是动态的,所以我对不同表的运行方式没有区别。
详细描述:

步骤1.

阅读结构并运行另一个notebook以生成字符串形式的SQL语句。

  1. from pyspark.sql import functions as F
  2. from pyspark.sql.functions import col
  3. from pyspark.sql.window import Window
  4. # Enable autoMerge for schema evolution
  5. spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
  6. query = dbutils.notebook.run(
  7. f"/bronze views/VW_D365_{table.upper()}",
  8. 3600,
  9. {
  10. "append_only_mode": "yes",
  11. "incremental": "yes",
  12. "catalog": catalog,
  13. "schema": schema,
  14. },
  15. )
  16. df = (
  17. spark.readStream.format("delta")
  18. .option("readChangeFeed", "true")
  19. .table(f"{catalog}.bronze.d365_{table.lower()}_ao")
  20. )

字符串

步骤2然后定义def函数,开始向目标表写入数据时传递。

  1. def update_changefeed(df, table, query, epochId):
  2. # Doing some transforamtion and picking up neccessary data
  3. filtered_df = df.filter(
  4. col("_change_type").isin("insert", "update_postimage", "delete")
  5. )
  6. filtered_df = filtered_df.drop(
  7. "_commit_timestamp", "_change_type", "_commit_version"
  8. )
  9. w = Window.partitionBy("id").orderBy(F.col("modifiedon").desc())
  10. filtered_df = (
  11. filtered_df.withWatermark("modifiedon", "1 second")
  12. .withColumn("rn", F.row_number().over(w))
  13. .where(F.col("rn") == 1)
  14. .drop("rn")
  15. )
  16. # Creating the global temp view on top of the dataframe in order to apply the select statement later
  17. filtered_df.createOrReplaceGlobalTempView(f"tmp_D365_{table}")
  18. # "query" refers to the output of the nootebook I run in the first step
  19. dfUpdates = sqlContext.sql(query)
  20. dfUpdates.columns
  21. # Below entire process is to collect the column names from source and target table to pass it in the merge function.
  22. # we will be merging on columns which contain BK in their names.
  23. p = re.compile("^BK_")
  24. list_of_columns = dfUpdates.columns
  25. list_of_BK_columns = [s for s in dfUpdates.columns if p.match(s)]
  26. string = ""
  27. for column in list_of_BK_columns:
  28. string += f"table.{column} = newData.{column} and "
  29. string_insert = ""
  30. for column in list_of_BK_columns:
  31. string_insert += f"table.{column} = newData.{column} and "
  32. string_insert[:-4]
  33. dictionary = {}
  34. for key in list_of_columns:
  35. dictionary[key] = f"newData.{key}"
  36. # Executing the merge function itself
  37. deltaTable = DeltaTable.forPath(
  38. spark,
  39. f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao",
  40. )
  41. deltaTable.alias("table").merge(
  42. dfUpdates.alias("newData"), string
  43. ).whenMatchedUpdate(set=dictionary).whenNotMatchedInsert(
  44. values=dictionary
  45. ).execute()

步骤3.

书写部

  1. df.writeStream.foreachBatch(
  2. lambda df, epochId: update_changefeed(df, table, query, epochId)
  3. ).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()


获取错误

  1. SET column `MSFT_DATASTATE` not found given columns: [`PK_D365_customeraddress`, `IsDelete` etc]


实际上,MSFT_DATASTATE列不在我的目标增量表中,应该将其合并到那里。我不确定它出了什么问题。
也许值得一提的是,在启用spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")之前,
我得到以下错误
无法解析UPDATE子句中给定列的MSFT_DATASTATE.

mtb9vblg

mtb9vblg1#

试试这个:

  1. from pyspark.sql.functions import col
  2. from pyspark.sql.window import Window
  3. from delta.tables import DeltaTable
  4. from pyspark.sql.types import StringType
  5. spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
  6. spark.conf.set("spark.databricks.delta.schema.autoOverwrite.enabled", "True")
  7. query = dbutils.notebook.run(
  8. f"/bronze views/VW_D365_{table.upper()}",
  9. 3600,
  10. {
  11. "append_only_mode": "yes",
  12. "incremental": "yes",
  13. "catalog": catalog,
  14. "schema": schema,
  15. },
  16. )
  17. df = spark.readStream.format("delta").option("readChangeFeed", "true").table(f"{catalog}.bronze.d365_{table.lower()}_ao")
  18. def update_changefeed(df, table, query, epochId):
  19. filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
  20. filtered_df = filtered_df.drop("_commit_timestamp", "_change_type", "_commit_version")
  21. w = Window.partitionBy("id").orderBy(col("modifiedon").desc())
  22. filtered_df = filtered_df.withWatermark("modifiedon", "1 second").withColumn("rn", F.row_number().over(w)).where(col("rn") == 1).drop("rn")
  23. filtered_df.createOrReplaceGlobalTempView(f"tmp_D365_{table}")
  24. dfUpdates = sqlContext.sql(query)
  25. target_path = f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao"
  26. deltaTable = DeltaTable.forPath(spark, target_path)
  27. for column in dfUpdates.columns:
  28. if column not in deltaTable.toDF().columns:
  29. deltaTable.addColumn(column, StringType(), True)
  30. p = re.compile("^BK_")
  31. list_of_BK_columns = [s for s in dfUpdates.columns if p.match(s)]
  32. merge_condition = " and ".join([f"table.{col} = newData.{col}" for col in list_of_BK_columns])
  33. dictionary = {key: f"newData.{key}" for key in dfUpdates.columns}
  34. deltaTable.alias("table").merge(dfUpdates.alias("newData"), merge_condition).whenMatchedUpdate(set=dictionary).whenNotMatchedInsert(values=dictionary).execute()
  35. checkpoint_directory = "YOUR_CHECKPOINT_DIRECTORY" # Update with your checkpoint directory
  36. df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, query, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

字符串

展开查看全部

相关问题