是否可以使用MirrorMaker2复制没有别名前缀的Kafka主题

toiithl6  于 2022-11-21  发布在  Apache
关注(0)|答案(6)|浏览(239)

我尝试在两个群集之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我非常确定可以完成此操作,但尚未找到更改此操作的正确配置
我的当前配置“mirrormaker2.properties“

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

参考资料:

o2g1uqev

o2g1uqev1#

要'禁用'主题前缀并同时正确镜像主题属性,我必须提供一个自定义的复制策略,该策略也覆盖topicSource方法。否则,即使在重新启动镜像生成器后,非默认的主题属性(例如,"cleanup.policy=compact")也不会被镜像。
以下是对我有效的完整程序:
1.将以下自定义复制策略编译并打包到.jar文件中(完整源代码可在here中找到):

public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}

1.将.jar复制到${KAFKA_HOME/libs目录中
1.通过设置${KAFKA_HOME}/config/mm2.properties中的replication.policy.class属性,将Mirror Maker 2配置为使用该复制策略:

replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
r6hnlfcb

r6hnlfcb2#

我可以使用以下设置删除前缀:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",

如果别名设置在您的情况下是必要的,我理解您应该使用其他replicationPolicy类。默认情况下使用DefaultReplicationPolicy类(https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html

pobjuy32

pobjuy323#

从Kafka 3.0.0开始,只需设置

replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

此外,marcin-wieloch的答案https://stackoverflow.com/a/60619233/12008693中的PrefixlessReplicationPolicy不再适用于Python 3.0.0(NullPointerException)。

bqf10yzr

bqf10yzr4#

我认为上面的答案是不恰当的。
在Mirror Maker 2.0中,如果要保持主题不被修改,则必须实现ReplicationPolicy。
可以引用DefaultReplicationPolicy.class,然后覆盖formatRemoteTopic(),覆盖后必须删除sourceClusterAlias + separator,最后在mm2.properties中配置replication.policy.class
我定义了MigrationReplicationPolicy.class

replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy

你应该看看MirrorClientConfig,class,我知道你会明白的

vsaztqbk

vsaztqbk5#

已使用Kafka ConfluentINC连接器映像版本5.4.2管理推送复制属性为:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
target.cluster.alias= 
replication.factor=3
tasks.max=3
topics=.*
source.cluster.alias= 
target.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
replication.policy.separator= 
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
source.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

1)在3个参数后面留出空间:源.群集.别名,复制.策略.分隔符,目的.群集.别名.
2)将此镜像连接器设置在目标Kafka上,而不是源上(仅执行拉入)
此外,您还可以使用Conductor或Kafka连接器UI landoop image - landoop/kafka-connect-ui
这仍然是一个测试场景,但它看起来很有希望。

fcwjkofz

fcwjkofz6#

我尝试在两个集群之间设置复制,但需要在两个集群中使用相同的主题名称,而不需要在connect-mirror-maker.properties中为提供别名。
默认情况下,将根据源群集别名重命名复制的主题。

Source --> Target
    topic-1 --> source.topic-1

通过将连接器属性文件下的以下属性设置为空,可以避免重命名主题。默认情况下,replication.policy.separator属性是一个句点,然后将其与source.cluster.alias沿着设置为空,目标主题将与源主题同名。

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

相关问题