我的delta表在更新模式和推断其中的新列时出现了一些问题。只有一个表出现了问题。表的其余部分工作正常。下面的代码是动态的,所以我对不同表的运行方式没有区别。
详细描述:
步骤1.
阅读结构并运行另一个notebook以生成字符串形式的SQL语句。
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
query = dbutils.notebook.run(
f"/bronze views/VW_D365_{table.upper()}",
3600,
{
"append_only_mode": "yes",
"incremental": "yes",
"catalog": catalog,
"schema": schema,
},
)
df = (
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table(f"{catalog}.bronze.d365_{table.lower()}_ao")
)
字符串
步骤2然后定义def
函数,开始向目标表写入数据时传递。
def update_changefeed(df, table, query, epochId):
# Doing some transforamtion and picking up neccessary data
filtered_df = df.filter(
col("_change_type").isin("insert", "update_postimage", "delete")
)
filtered_df = filtered_df.drop(
"_commit_timestamp", "_change_type", "_commit_version"
)
w = Window.partitionBy("id").orderBy(F.col("modifiedon").desc())
filtered_df = (
filtered_df.withWatermark("modifiedon", "1 second")
.withColumn("rn", F.row_number().over(w))
.where(F.col("rn") == 1)
.drop("rn")
)
# Creating the global temp view on top of the dataframe in order to apply the select statement later
filtered_df.createOrReplaceGlobalTempView(f"tmp_D365_{table}")
# "query" refers to the output of the nootebook I run in the first step
dfUpdates = sqlContext.sql(query)
dfUpdates.columns
# Below entire process is to collect the column names from source and target table to pass it in the merge function.
# we will be merging on columns which contain BK in their names.
p = re.compile("^BK_")
list_of_columns = dfUpdates.columns
list_of_BK_columns = [s for s in dfUpdates.columns if p.match(s)]
string = ""
for column in list_of_BK_columns:
string += f"table.{column} = newData.{column} and "
string_insert = ""
for column in list_of_BK_columns:
string_insert += f"table.{column} = newData.{column} and "
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f"newData.{key}"
# Executing the merge function itself
deltaTable = DeltaTable.forPath(
spark,
f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao",
)
deltaTable.alias("table").merge(
dfUpdates.alias("newData"), string
).whenMatchedUpdate(set=dictionary).whenNotMatchedInsert(
values=dictionary
).execute()
型
步骤3.
书写部
df.writeStream.foreachBatch(
lambda df, epochId: update_changefeed(df, table, query, epochId)
).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
型
获取错误
SET column `MSFT_DATASTATE` not found given columns: [`PK_D365_customeraddress`, `IsDelete` etc]
型
实际上,MSFT_DATASTATE
列不在我的目标增量表中,应该将其合并到那里。我不确定它出了什么问题。
也许值得一提的是,在启用spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
之前,
我得到以下错误
无法解析UPDATE子句中给定列的MSFT_DATASTATE.
1条答案
按热度按时间mtb9vblg1#
试试这个:
字符串