我尝试在两个群集之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我非常确定可以完成此操作,但尚未找到更改此操作的正确配置
我的当前配置“mirrormaker2.properties“
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = cluster1, cluster2
# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092
# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
参考资料:
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RunningastandaloneMirrorMakerconnector
- https://kafka.apache.org/24/javadoc/index.html?constant-values.html
6条答案
按热度按时间o2g1uqev1#
要'禁用'主题前缀并同时正确镜像主题属性,我必须提供一个自定义的复制策略,该策略也覆盖
topicSource
方法。否则,即使在重新启动镜像生成器后,非默认的主题属性(例如,"cleanup.policy=compact"
)也不会被镜像。以下是对我有效的完整程序:
1.将以下自定义复制策略编译并打包到.jar文件中(完整源代码可在here中找到):
1.将.jar复制到
${KAFKA_HOME/libs
目录中1.通过设置
${KAFKA_HOME}/config/mm2.properties
中的replication.policy.class
属性,将Mirror Maker 2配置为使用该复制策略:r6hnlfcb2#
我可以使用以下设置删除前缀:
如果别名设置在您的情况下是必要的,我理解您应该使用其他replicationPolicy类。默认情况下使用DefaultReplicationPolicy类(https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html)
pobjuy323#
从Kafka 3.0.0开始,只需设置
此外,marcin-wieloch的答案https://stackoverflow.com/a/60619233/12008693中的PrefixlessReplicationPolicy不再适用于Python 3.0.0(NullPointerException)。
bqf10yzr4#
我认为上面的答案是不恰当的。
在Mirror Maker 2.0中,如果要保持主题不被修改,则必须实现ReplicationPolicy。
可以引用DefaultReplicationPolicy.class,然后覆盖
formatRemoteTopic()
,覆盖后必须删除sourceClusterAlias + separator
,最后在mm2.properties
中配置replication.policy.class
我定义了
MigrationReplicationPolicy.class
你应该看看
MirrorClientConfig,class
,我知道你会明白的vsaztqbk5#
已使用Kafka ConfluentINC连接器映像版本5.4.2管理推送复制属性为:
1)在3个参数后面留出空间:源.群集.别名,复制.策略.分隔符,目的.群集.别名.
2)将此镜像连接器设置在目标Kafka上,而不是源上(仅执行拉入)
此外,您还可以使用Conductor或Kafka连接器UI landoop image - landoop/kafka-connect-ui
这仍然是一个测试场景,但它看起来很有希望。
fcwjkofz6#
我尝试在两个集群之间设置复制,但需要在两个集群中使用相同的主题名称,而不需要在connect-mirror-maker.properties中为提供别名。
默认情况下,将根据源群集别名重命名复制的主题。
通过将连接器属性文件下的以下属性设置为空,可以避免重命名主题。默认情况下,replication.policy.separator属性是一个句点,然后将其与source.cluster.alias沿着设置为空,目标主题将与源主题同名。