我们有一个现有的主题,其中数据由jdbc源连接器使用模式increment+timestamp发布(源连接器使用increment+timestamp)(https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html#incremental-查询模式)
我们有现有的消费群体,他们使用一些现有主题的数据。现在我们引入了一个新的使用者组(称为k组),它应该使用来自相同现有主题的数据,并且应该写入数据库。作为第一步,我们有一个初始的数据迁移工作流来转储源数据库,并在开始使用来自现有主题的消息之前将转储复制到目标数据库。
现在当消费者群体开始时,我想知道它应该从什么开始补偿?
一种选择是使用最新版本。但问题是,在为这个新的消费群体进行初始数据迁移时,现有的源连接器会将数据发布到现有的主题中。在我们的例子中,我们有10个表要迁移,在表转储的地方可能会有一个缺口,但是仍然对源数据库做了一些更改,因此数据将被添加到主题中。所以,我们可能会错过处理一些记录的机会。
我们没有暂停源连接器的选项,这将为我们解决问题。
如果我们使用offset-earlime,我们将最终处理来自kafka主题的所有旧数据,这是不需要的,因为我们已经完成了初始数据迁移。
我们只想维护一个源连接器,而不考虑用户组的数量。
我正在浏览kafka消费API,比如seek,它需要时间戳。我可以记下初始数据迁移之前的时间,并在使用者组启动并分配分区后调用consumer.seek。但我找不到任何文件说时间戳是基于格林威治标准时间或其他东西。通过传递从epoch经过的毫秒数来使用这个api可以吗?
1条答案
按热度按时间hrysbysz1#
如果我正确理解这句话:“如果我们使用offset latest,我们可能会丢失一些数据,因为源连接器可能在初始数据迁移期间向主题写入了一些数据”主题最终会将来自初始加载的一些数据和cdc数据混在一起,因此没有明显不同于此的偏移量。因此,设置任何特定偏移量都不会太远。
我看到以下选项:
让您的客户组k过滤掉初始负载数据并从最早的
生成专用主题的初始加载数据
如果可能,在工作时间之外执行初始加载,以便没有cdc数据流动(可能在周末或银行假日期间)