使用kafka与更新和删除进行数据集成

t40tm48m  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(447)

我们有大量的数据源,从rdbms到s3文件。我们希望将这些数据与其他各种数据仓库、数据库等进行同步和集成。
起初,这似乎是Kafka的典范。我们希望将数据更改通过kafka流式传输到数据输出源。在我们的测试用例中,我们用oracle golden gate捕获更改,并成功地将更改推送到kafka队列中。然而,将这些更改传递到数据输出源已被证明具有挑战性。
我意识到,如果我们只是在Kafka主题和队列中添加新数据,这将非常有效。我们可以缓存更改并将更改写入各种数据输出源。然而事实并非如此。我们将更新、删除、修改分区等等。处理这个问题的逻辑似乎要复杂得多。
我们尝试使用暂存表和联接来更新/删除数据,但我觉得这样做很快就会变得相当笨拙。
这就是我的问题-有没有什么不同的方法,我们可以去处理这些行动?或者我们应该完全换个方向?
如有任何建议/帮助,我们将不胜感激。谢谢您!

jexiocij

jexiocij1#

您可以采取三种方法:
满载卸载
增量卸载
binlog复制

满载卸载

定期将rdbms数据源表转储到一个文件中,并将其加载到数据仓库中,以替换以前的版本。这种方法对于小型表非常有用,但是实现起来非常简单,并且支持对数据的更新和删除。

增量卸载

定期获取自上次查询以来发生更改的记录,并将其发送到数据仓库以进行加载。类似于

SELECT *
FROM my_table
WHERE last_update > #{last_import}

这种方法的实现稍微复杂一些,因为您必须维护状态(上面代码段中的“last\u import”),并且它不支持删除。它可以扩展为支持删除,但这使它更复杂。这种方法的另一个缺点是需要表具有 last_update 列。

binlog复制

编写一个程序,持续侦听rdbms的binlog,并将这些更新发送到数据仓库中的中间表,其中包含行的更新值,以及是删除操作还是更新/创建操作。然后编写一个查询,定期合并这些更新以创建一个镜像原始表的表。此合并过程背后的思想是为每个id选择在所有更新中看到的最后一个(最高级)版本,或者在合并表的早期版本中看到的最后一个(最高级)版本。
这种方法实现起来稍微复杂一些,但是即使在大型表上也可以实现高性能,并且支持更新和删除。
kafka与此方法相关,因为它可以用作binlog侦听器和数据仓库中间表加载之间行更新的管道。
您可以在这篇博文中阅读更多关于这些不同复制方法的信息。
披露:我在alooma工作(一个同事写了上面链接的博客文章,我们提供数据管道作为服务,解决了这样的问题)。

相关问题