如何在动态/启动时创建Kafka主题,供制作人发送到?

svmlkihl  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(310)

我开始为kafka使用confluent.net库,并尝试实现一个我在azure服务总线上使用的模式,以便在生产者应用程序启动时创建主题(如果不存在则创建)。在kafkaapi中如何做到这一点,可以做到吗?
这将允许主题成为源代码管理的一部分,并在自动发布过程中进行配置,而不是按主题/环境手动设置。另外,我希望我的开发人员不必去每个kafka示例/环境,首先配置它们以匹配。
如果我不能这样做,我将不得不在发布过程中将其烘焙到bash脚本中,但更希望在启动代码中使用它。

1aaf6o9v

1aaf6o9v1#

您可以启用集群范围的配置auto.create.topics.enable。
如果一个新的生产者试图向一个还不存在的主题发送数据,这将自动创建一个主题。
但是,请注意以下几点:
将使用复制、分区数和保留的默认设置创建主题。确保根据需要更改这些默认设置。无论如何,所有自动创建的主题都将具有相同的配置。
生产者代码中主题名称配置中的输入错误可能会导致不必要的主题创建。
或者,您可以使用adminclient api。示例如下:

static async Task CreateTopicAsync(string bootstrapServers, string topicName) { using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try { await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } }); } catch (CreateTopicsException e) { Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); } } }

相关问题