通过pyspark更新配置单元中的插入数据

2w2cym1i  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(346)

我有不断变化的数据来源。我正在通过sqoop拉取数据,但由于卷很大,我无法将其作为每日截断负载。我想附加数据,但逻辑应该是更新和插入。如果记录在源中通过删除以前的相同记录进行更新,则应在配置单元中执行相同的操作,即删除旧记录并插入/更新新记录。下面就是一个这样的例子。

一段时间后,比如说30分钟,数据更新如下:

现在,我的配置单元表拾取原始记录,并在一段时间后拾取更新的记录,但将其作为另一行插入。

我希望数据与源中的数据一样反映出来,而不覆盖我的表((建议使用Pypark代码)
请帮忙。谢谢您。

oyjwcjzk

oyjwcjzk1#

您可以在源中添加更多列作为last\u modified,也可以在配置单元中添加作为last\u load,并且在下次上载时,可以在(源中的id列和配置单元中的id列)和(源表的last\u modified和配置单元列的last\u update)上指定两个条件。

1zmg4dgp

1zmg4dgp2#

不提供查询,但给出如何实现这一点的想法:
在源和实际配置单元表之间创建一个临时表,其中包含所有记录(插入和更新)。
要获得实际的配置单元表,请使用rank函数,例如:

rank() over (partition by id order by ingested_ts desc) as rnk
---
---
where rnk = 1


注意:根据您的数据量,hive\u staging表可能会增长,因此您需要相应地添加分区/存储桶。

相关问题