kafka管理客户端在kafka中创建主题?

vjhs03f7  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(702)

是否有任何python kafka管理客户端可以从python程序中创建/删除主题?我找到了一些pythonapi,但是没有一个有可用的管理api?
合流有python管理api吗?

yjghlzjz

yjghlzjz1#

from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')

topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
pgccezyw

pgccezyw2#

合流的python-kafka客户端确实有管理支持。
看看这个例子,学习如何使用它。

g52tjvyc

g52tjvyc3#

我发现这个:
https://github.com/confluentinc/confluent-kafka-python
对我来说,至少,在主题创建时,我指定一个或多个主题,然后在我的kafka代理中创建它们。
看看我自己的答案:
我可以通过api在Kafka中创建一个主题吗?
我的要求和你差不多。
使用示例:

from confluent_kafka.admin import AdminClient, NewTopic

topic = sys.argv[1]

topics = ["newTopicExample","newTopicExample2"]

# Create topic in our Kafka, using kafka-python library.

a = AdminClient({'bootstrap.servers': 'myKafkaBrokerURL'})

new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]

# Call create_topics to asynchronously create topics. A dict

# of <topic,future> is returned.

fs = a.create_topics(new_topics)

# Wait for each operation to finish.

for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

相关问题