我正在使用一个带有模式timestamp+递增的jdbc源连接器从postgres获取表。数据更新反映在Kafka主题中,但删除记录没有效果。所以,我的问题是:有什么方法可以处理删除的记录吗?如何处理已删除但仍存在于Kafka主题中的记录?
dldeef671#
建议是1)将源数据库也调整为仅附加/更新,或者通过一个布尔值或kafka connect查询表时过滤掉的时间戳。如果数据库空间不足,则可以删除旧记录,这些记录应该已经由kafka处理过了选项2)使用cdc工具立即捕获删除事件,而不是在周期表扫描中丢失它们。debezium是postgres的流行选择
du7egjpx2#
Kafka主题可以看作是一个“仅附加”日志。它可以让所有的会议持续多久,但Kafka并不是为了从一个主题中删除单独的信息而构建的。在您描述的场景中,下游应用程序(使用主题)处理已删除记录上的信息是很常见的。作为一种选择,你可以设置 cleanup.policy 你的主题 compact 这意味着它最终将只保留每个键的最新值。如果现在将消息的键定义为postgres表的主键,则在生成具有相同键和 null 将价值融入主题。然而,我不确定你的连接器是否灵活根据您对kafka主题中的数据所做的操作,这仍然不能解决您的问题,因为下游应用程序仍将同时读取原始记录和 null 作为已删除记录的消息。
cleanup.policy
compact
null
2条答案
按热度按时间dldeef671#
建议是1)将源数据库也调整为仅附加/更新,或者通过一个布尔值或kafka connect查询表时过滤掉的时间戳。
如果数据库空间不足,则可以删除旧记录,这些记录应该已经由kafka处理过了
选项2)使用cdc工具立即捕获删除事件,而不是在周期表扫描中丢失它们。debezium是postgres的流行选择
du7egjpx2#
Kafka主题可以看作是一个“仅附加”日志。它可以让所有的会议持续多久,但Kafka并不是为了从一个主题中删除单独的信息而构建的。
在您描述的场景中,下游应用程序(使用主题)处理已删除记录上的信息是很常见的。
作为一种选择,你可以设置
cleanup.policy
你的主题compact
这意味着它最终将只保留每个键的最新值。如果现在将消息的键定义为postgres表的主键,则在生成具有相同键和null
将价值融入主题。然而,我不确定你的连接器是否灵活
根据您对kafka主题中的数据所做的操作,这仍然不能解决您的问题,因为下游应用程序仍将同时读取原始记录和
null
作为已删除记录的消息。