from confluent_kafka.admin import AdminClient, NewTopic
topic = sys.argv[1]
topics = ["newTopicExample","newTopicExample2"]
# Create topic in our Kafka, using kafka-python library.
a = AdminClient({'bootstrap.servers': 'myKafkaBrokerURL'})
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]
# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = a.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
3条答案
按热度按时间yjghlzjz1#
pgccezyw2#
合流的python-kafka客户端确实有管理支持。
看看这个例子,学习如何使用它。
g52tjvyc3#
我发现这个:
https://github.com/confluentinc/confluent-kafka-python
对我来说,至少,在主题创建时,我指定一个或多个主题,然后在我的kafka代理中创建它们。
看看我自己的答案:
我可以通过api在Kafka中创建一个主题吗?
我的要求和你差不多。
使用示例: