pyspark中的增量批处理

xuo3flqw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(432)

在我们的spark应用程序中,我们每天运行多个批处理进程。这些批处理过程的源代码不同,如oracle、mongodb和文件。我们根据源代码存储不同的值以进行增量处理,比如某些oracle表的最新时间戳、某些oracle表的id、某些文件系统的列表,并将这些值用于下一次增量运行。
当前这些偏移量值的计算依赖于源类型,每次添加新的源类型时,我们都需要自定义代码来存储这个值。有没有像流媒体中的检查点这样的通用方法来解决这个问题。

4smxwvx5

4smxwvx51#

我总是喜欢查看上一次写入分区的目的地,或者获取一些max(主密钥),然后根据该值从源数据库中选择要在当前运行期间写入的数据。
无需存储任何内容,只需向批处理算法提供表名、源类型和主键/时间戳列。然后,算法将找到您已经拥有的最新值。
这实际上取决于你的负载原理和你的存储是如何分配的;如果您有原始/源/准备层。加载原始格式的数据是一个好主意,它可以很容易地与原始源进行比较,以便完成我上面描述的工作。
备选方案包括:
编写一个包含主列和最新值的文件时,批处理作业将读取此文件以确定下一步要读取的内容。
使用与最新值对应的参数更新作业执行配置,以便下次运行时将最新值传递给算法。

相关问题