pyspark 如何更新增量表中已经添加的数据并插入新数据?

kulphzqa  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(154)

我在pyspark中有一个框架,我每天都读同一个文件。新的数据被添加到这个文件中。我把数据插入到一个delta表中。
我想更新已经插入的数据,因为有时数据会在我读取的文件中更新。
我的网络看起来像这样:

col1    col2    col3           col4
name_1  city_1  date_of_today  {'Date': X, "Weight (KG)": 13.4}
name_1  city_1  date_of_today  {'Date': X, "Weight (KG)": 1000}

字符串
我把它插入到delta表中,我想更新满足条件的数据。
我这样做了:

from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "abfss://[email protected]/db/db_name/table/")

deltaTable.alias("target").merge(
    df_content_transformed.alias("source"),
    condition
    ) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()


条件值为:

"""target.name = source.name AND target.city= source.city AND get_json_object(target.value, '$.Date') = get_json_object(source.value, '$.Date') AND get_json_object(target.value, '$.Weight (KG)') = get_json_object(source.value, '$.Weight (KG)')"""


当我执行一个管道调用这个代码时,我有以下错误:

Py4JJavaError: An error occurred while calling o4197.execute.
: org.apache.spark.sql.delta.DeltaUnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches.


我能做什么?

eh57zj3b

eh57zj3b1#

您需要确保对于目标表中的每一行,在源对象框架中最多有一个对应的行匹配合并条件。
要解决此问题:
1.删除源 Dataframe 的重复数据:在合并前,对df_content_transformed Dataframe 进行重复数据删除,以确保在合并条件中使用的每个值组合只有一个唯一的行。

from pyspark.sql import Window
import pyspark.sql.functions as F

window_spec = Window.partitionBy("name", "city", F.expr("get_json_object(value, '$.Date')"), F.expr("get_json_object(value, '$.Weight (KG)')")).orderBy("date_of_today")

df_deduplicated = df_content_transformed.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")

字符串
1.使用已消除重复数据的DataFrame进行合并:现在在合并操作中使用已消除重复数据的DataFrame(df_deduplicated)。

deltaTable.alias("target").merge(
    df_deduplicated.alias("source"),
    condition
    ) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()


在上面的重复数据删除步骤中,我使用了date_of_today列来选择在重复的情况下保留哪一行。这只是一个示例逻辑。您可以根据自己的需求进行调整。

相关问题