我想将mirrormaker作为独立连接器运行。到目前为止,我还没有找到任何有关配置的文档。就我所想象的,以下情况会重复 myTopic
.
现在在目标集群中,我需要让主题有另一个名称 foo
(不是自动重命名)。这是否得到了 MirrorSourceConnector
或者我需要其他方法吗?
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasksMax = 2
topics = myTopic
source.cluster.bootstrap.servers = sourceHost:9092
target.cluster.bootstrap.servers = sinkHost:9092
1条答案
按热度按时间x759pob21#
所以kafka mirror maker的源代码有一个不错的readme.md。
如何配置它取决于您是直接运行mm2还是在kafka connect中运行。你直接说的,在链接的readme.md里。
基本上:
默认情况下,复制的主题将基于“源群集别名”重命名:
主题1-->源.topic-1
这可以通过重写replication.policy.separator属性(默认值为句点)进行自定义。如果您需要更多地控制如何定义远程主题,可以实现自定义复制策略并重写replication.policy.class(默认值为defaultreplicationpolicy)。
不幸的是,这意味着您不能仅通过配置代码重命名主题(defaultreplicationpolicy类仅允许您指定分隔符(而不允许指定其他内容)。这可能是因为在指定要镜像的主题时,使用的是正则表达式,而不是单个主题名称(即使源集群topic config属性只是主题的名称,它仍然被视为正则表达式)。
那么,回到文档:
ReplicationPolicy
是kafka connect源代码中的一个java接口,因此需要实现一个实现ReplicationPolicy
然后在运行mm2时确保它在类路径上。假设你写了这样一个类,你称之为
com.moffatt.kafka.connect.mirror.FooReplicationPolicy
. 对于您的类来说,一个好的模板是kafka connect附带的默认(显然是唯一的)复制策略类:DefaultReplicationPolicy
. 你可以看到,建立自己的不会太难。您可以很容易地添加一个Map(硬编码或配置的),用于查找特定的配置主题名称并将其Map到目标主题名称。您可以通过在配置中指定新类来使用它:
replication.policy.class = com.moffatt.kafka.connect.mirror.FooReplicationPolicy