我有一个已经运行的生产部署kafka集群,主题是“现有主题”。我正在使用debezium的mongodb源连接器。
在这里,我只想将cdc事件直接推送到主题“existing topic”上,以便已经在收听该主题的消费者能够处理它。
我没有找到任何资源这样做,但它提到的主题是创建在下面的格式-
如果mongodb.name参数为a,数据库名称为b,集合名称为c,则来自数据库a和集合c的数据将加载到主题a.b.c下
我可以将主题更改为“现有主题”并将事件推送到它吗?
我有一个已经运行的生产部署kafka集群,主题是“现有主题”。我正在使用debezium的mongodb源连接器。
在这里,我只想将cdc事件直接推送到主题“existing topic”上,以便已经在收听该主题的消费者能够处理它。
我没有找到任何资源这样做,但它提到的主题是创建在下面的格式-
如果mongodb.name参数为a,数据库名称为b,集合名称为c,则来自数据库a和集合c的数据将加载到主题a.b.c下
我可以将主题更改为“现有主题”并将事件推送到它吗?
2条答案
按热度按时间wwodge7n1#
根据文件,
Kafka主题的名字总是以
logicalName.databaseName.collectionName
,在哪里logicalName
是用指定的连接器的逻辑名称mongodb.name
配置属性,databaseName
发生操作的数据库的名称,以及collectionName
存在受影响文档的mongodb集合的名称。这意味着如果连接器的逻辑名称是
myConnector
还有你的数据库myDatabase
有两个收藏users
以及orders
```{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField
vltsax252#
我在使用jdbc源代码连接器时遇到了同样的问题,并找到了不同的解决方案:
使用
RegexRouter
单个消息转换dropPrefix
您可以覆盖整个主题名称:而且它可以与regex一起工作,因此如果您使用多个表/集合,并且您创建的主题名称不是常量,那么您应该能够使其成为动态的。
从技术上讲,我删除了整个主题名,然后添加了一个新的主题名,这不是最好的解决方案。