我正在使用confluent 3.3.0。我正在使用 jdbc-source-connector
从我的oracle表向kafka插入消息。这个很好用。
我想看看是否可以“向上插入”。
我的意思是,如果我有一个学生表,有3列 id
(编号), name
(varchar2),以及 last_modified
(时间戳)。每当我插入新行时,它将被推送到kafka(使用时间戳+自动递增字段)。但是当我更新行时,Kafka中相应的消息应该被更新。
这个 id
我的table应该是他们的 key
相应的Kafka信息。我的主键(id)将作为引用保持不变。
每次更新行时,timestamp字段都会得到更新。
这可能吗?或者删除Kafka中已有的记录并插入新的记录。
1条答案
按热度按时间rmbxnbpk1#
但是当我更新行时,Kafka中相应的消息应该被更新
这是不可能的,因为Kafka在设计上只是附加的,而且是不变的。
最好的方法是按某个查询所有行
last_modified
列,或钩住cdc解决方案,如oracle goldengate或alpha debezium解决方案,这些解决方案将捕获数据库上的单个更新事件,并在kafka主题上附加一条全新的记录。如果您想在kafka中对数据库记录进行重复数据消除(请查找带有max的消息)
last_modified
在一段时间内),您可以使用kafka流或ksql来执行这种类型的后处理过滤。如果使用压缩的kafka主题,并将数据库密钥作为kafka消息密钥插入,则压缩后,最新附加的消息将保留,具有相同密钥的上一条消息将被删除,而不是更新