pyspark Delta Lake表按列并行更新

scyqe7ek  于 2023-01-08  发布在  Spark
关注(0)|答案(1)|浏览(159)

我希望每个人都做得很好。我有一个很长的问题,所以请原谅我。

    • 背景**:

因此,CDC有效负载来自Yugabyte的Debezium连接器,格式如下:
第一个月
{
"payload": {
"before": null,
"after": {
"id": {
"value": "MK_1",
"set": true
},
"status": {
"value": "new_status",
x1米11米1x
},
"status_metadata": {
"value": "new_status_metadata",
"set": true
},
"creator": {
"value": "new_creator",
"set": true
x1米20英寸1x
"created": null,
"creator_type": null,
"updater": null,
"updated": null,
"updater_type": {
"value": "new_updater_type",
"set": true
}
},
x1米30英寸1x
"version": "1.7.0.13-BETA",
"connector": "yugabytedb",
"name": "dbserver1",
"ts_ms": -4258763692835,
"snapshot": "false",
"db": "yugabyte",
"sequence": "[\"0:0::0:0\",\"1:338::0:0\"]",
"schema": "public",
x1米39英寸
x1米40英寸1x
"lsn": "1:338::0:0",
"xmin": null
},
"op": "u",
"ts_ms": 1669795724779,
"transaction": null
}
}
"""
有效负载由beforeafter字段组成。正如"op:u"所示,这是一个更新操作。因此,Yugabyte表中ID为MK_1的名为customers的行已使用新值更新。但是,after字段仅显示其值已更新的那些列。因此,"after"中为null的字段尚未更新,例如,create为null,因此尚未更新,但状态为{" value ":"新状态"、"设置":true}表示status列值已更新为新值"new_status"。现在我有了PySpark结构化流管道,它接收这些有效负载,处理它们,然后生成以下形式的微 Dataframe :
x1米50英寸
"set_column"是真还是假取决于有效载荷。

    • 问题:**

现在我在Delta Lake上有了一个Delta表,其模式如下:
id | status | status_metadata | creator | created | created_type | updater | updated | updater_type
我正在使用以下代码更新上面的delta表,使用的是python delta lake API**(v 2.2.0)**:
for column in fields_map:
delta_lake_merger.whenMatchedUpdate(
condition = f"update_table.op = 'u' AND update_table.set_{column} = 'true'"
, set={column : fields_map[column]}
).execute()
现在你可能想知道为什么我是逐列更新而不是一次更新所有列。这正是我所面临的问题。**如果我在没有set_col = true条件的情况下一次更新所有列,那么它将覆盖delta表中匹配id的行的整个状态。**这不是我想要的。
"我想要什么"

    • 我只想更新有效负载中值不为空的那些列。**如果我像这样一次更新所有列:

delta_lake_merger.whenMatchedUpdate(
condition = f"update_table.op = 'u'"
, set=fields_map
x1米60英寸1x
然后delta lake api也会将delta表中那些没有更新的列替换为空值,因为这是cdc包中未更新列的值,上面的迭代解决方案适用于我对delta表中所有行进行逐列更新的情况,因为它只忽略给定列中set_列为False,因此保留增量表中的现有值。
然而,这是缓慢的,因为它以顺序的方式写入数据N次,这会成为流查询的瓶颈。由于所有列式更新都是独立的,在delta lake python API中是否有任何方法,我可以一次更新所有列,但也可以使用set_column条件?我知道可能有一种方法,因为每个列都是独立的调用来为给定条件下的每一列写入数据。我想为所有具有set_condition的列一次调用execute命令,而不是将其放入循环中。
PS:我在考虑为Python使用Asyncio库,但不是很确定。非常感谢。

mklgxw1f

mklgxw1f1#

我已经能够找到一个解决方案,如果有人被困在一个类似的问题,你可以在whenMatchedUpdate的set字段中使用一个CASE WHEN:

delta_lake_merger.whenMatchedUpdate(set = "CASE WHEN update_table.set_{column}='true' THEN update_table.{column} ELSE main_table.{column} END")

这将使用设置的条件一次对所有列执行更新。

相关问题