kafka mirrormaker 2.0复制每条消息

xeufq47z  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(591)

我正在尝试用mirrormaker 2.0复制kafka集群。我正在使用以下属性:

name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2

# for demo, source and target clusters are the same

source.cluster.alias = site1
target.cluster.alias = site2

site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093

site1->site2.enabled = true
site1->site2.topics = topic1

# use ByteArrayConverter to ensure that records are not re-encoded

key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

所以问题是,mm2似乎总是复制x3消息:


# Manual message production:

 kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"

# Result in the source topic (site1 cluster):

% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407

 kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"

# Result in the target topic (site2 cluster):

% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3

我试着从confluent软件包中使用kafka,直接从apache中使用kafkaè2.13-2.4.0,两者都使用debian 10.1。
我首先用Confluent5.4鼓励这种行为,认为这可能是他们的软件包中的一个错误,因为他们有replicator,不应该真正关心mm2,但是我直接从apache复制了kafka琰2.13-2.4.0的完全相同的问题,没有任何更改。
我知道mm2还不是幂等的,不能保证一次交货。在我的测试中(我尝试了很多事情,包括生产者调整或更大批量的数千条消息)。在所有这些测试中,mm2总是复制x3所有消息。
我错过了什么,有人鼓励我做同样的事吗?作为一个站点说明,遗留mm1具有相同的包,我没有这个问题。
谢谢你的帮助。。。谢谢!
即使changelog没有让我对改进很有信心,我还是再次尝试运行一个mm2,这次是从kafka 2.4.1开始的。=>所有这些奇怪的复制品都没有变化。
我在新服务器上安装了这个版本,以确保我遇到的奇怪行为与服务器无关。
当我使用acl时,我需要特殊的权限吗?我把“所有”都放在心里,认为这是再许可不过的了。。。即使mm2不是幂等的是,我也会尝试一下与之相关的右边。
更让我惊讶的是,我找不到任何报告这样的问题,当然我必须做错事,但这是什么问题。。。

11dmarpk

11dmarpk1#

你需要移除 connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector 根据您的配置,因为这是告诉mirror maker将这个类用于它与复制数据的源连接器一起生成的心跳和检查点连接器,并且这个类使它们的行为与源连接器完全一样,所以这就是为什么每次复制3条消息的原因,您实际生成了3个源连接器。

wmtdaxz3

wmtdaxz32#

对客户机配置启用幂等将解决此问题。默认情况下,它将设置为false。将以下内容添加到mm2.properties文件中

source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true

相关问题