如何将一个主题从一个经纪人转移到Kafka的另一个经纪人?

gg0vcinb  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(397)

我首先尝试在特定的代理中创建一个主题。但看起来这是不可能的。即使我在引导中提到了代理主机

admin_client = AdminClient({
    "bootstrap.servers": "xxx1.com:9092,xxx2.com:9092"
})

futmap=admin_client.create_topics(topic_list)

该计划是任意挑选的5个经纪人之一,我作为该主题的首席经纪人。我在想为什么会这样。
我也在试着看看是否可以将主题的引导者重新分配给另一个代理。我知道可以通过kafka重新分配分区命令行脚本来实现,但是我想用python和合流kafka包编程实现。有没有可能通过编程来实现这一点。我在合流kafka包的admin类中没有找到重新分配的分区函数
谢谢

wd2eg0qa

wd2eg0qa1#

我终于找到了解决方案,合流kafka python包的文档不足以解决这个问题。但开源的一个好处是,你可以阅读代码并找出答案。因此,要在特定的代理中创建主题,我必须按照下面的方式编写创建主题代码。请注意,我使用了副本分配而不是复制因子。这两者是相互排斥的。如果使用复制因子,分区将由kafka分配,您可以通过副本分配来控制分配。但是,我确信在重新平衡/重新分配分区的情况下,这将被重新分配。但这也可以通过on-u-revoke事件来处理。但现在,这对我很有效。

def createTopic(admin_client,topics):
    #topic_name=topics
    topic_name = ['rajib1_test_xxx_topic']
    replica_assignment = [[262, 261]]
    topic_list = [NewTopic(topic, num_partitions=1, replica_assignment=replica_assignment) for topic in topic_name]
futmap=admin_client.create_topics(topic_list)

# Wait for each operation to finish.

for topic, f in futmap.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

# return futmap
mrwjdhj3

mrwjdhj32#

你也可以用 kafka-reassign-partitions.sh Kafka提供的工具,用于将一个主题的副本更改为另一个代理。
例如,如果您希望将主题“test”(在本例中为单复制、单分区)放在代理“1”上,可以首先定义一个计划(名为replicachange.json):

{
"partitions":
  [{"topic": "test", "partition": 0,
    "replicas": [
       1
    ]
  }],
"version":1
}

然后使用以下方法执行:

kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute \
--reassignment-json-file replicachange.json

相关问题