我需要为kafka编写一个restapi,它可以分别从消费者/生产者那里读取或写入数据。我怎么能这么做?
lawou6xi1#
这是来自confluent的RESTAPI(rest代理)代码示例。我得把它打出来,所以可能有拼写错误。我希望这对你有点帮助。(生产者使用用python编写的restapi)
import requestsimport base64import jsonurl = "http://restproxy:8082/topics/my_topic"headers = {"Content-Type" : "application/vnd.kafka.binary.v1 + json" }# Create one or more messagespayload = {"records": [{ "key":base64.b64encode("firstkey"), "value":base64.b64encode("firstvalue") }]}# Send the messager = requests.post(url, data=json.dumps(payload), headers=headers)if r.status_code != 200: print "Status Code: " + str(r.status_code) print r.text
import requests
import base64
import json
url = "http://restproxy:8082/topics/my_topic"
headers = {
"Content-Type" : "application/vnd.kafka.binary.v1 + json"
}
# Create one or more messages
payload = {"records":
[{
"key":base64.b64encode("firstkey"),
"value":base64.b64encode("firstvalue")
}]}
# Send the message
r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code != 200:
print "Status Code: " + str(r.status_code)
print r.text
(使用用python编写的restapi的使用者)
import requestsimport base64import jsonimport sys# Base URL for interacting with REST serverbaseurl = "http://restporxy:8082/consumers/group1"# Create the Consumer instanceprint "Creating consumer instance"payload { "format": "binary" }headers = {"Content-Type" : "application/vnd.kafka.v1+json" }r = requests.post(baseurl, data=json.dumps(payload), headers=headers)if r.status_code !=200: print "Status Code: " + str(r.status_code) print r.text sys.exit("Error thrown while creating consumer")# Base URI is used to identify the consumer instancebase_uri = r.json()["base_uri"]# Get the messages from the consumerheaders = { "Accept" : "application/vnd.kafka.binary.v1 + json" }# Request messages for the instance on the Topicr = requests.get(base_uri + "/topics/my_toopic", headers = headers, timeout =20)if r.status_code != 200: print "Status Code: " + str(r.status_code) print r.text sys.exit("Error thrown while getting message")# Output all messagesfor message in r.json(): if message["key"] is not None: print "Message Key:" + base64.b64decode(message["key"]) print "Message Value:" + base64.b64decode(message["value"])# When we're done, delete the consumerheaders = { "Accept" : "application/vnd.kafka.v1+json" }r = requests.delete(base_uri, headers=headers)if r.status_code != 204: print "Status Code: " + str(r.status_code) print r.text
import sys
# Base URL for interacting with REST server
baseurl = "http://restporxy:8082/consumers/group1"
# Create the Consumer instance
print "Creating consumer instance"
payload {
"format": "binary"
"Content-Type" : "application/vnd.kafka.v1+json"
r = requests.post(baseurl, data=json.dumps(payload), headers=headers)
if r.status_code !=200:
sys.exit("Error thrown while creating consumer")
# Base URI is used to identify the consumer instance
base_uri = r.json()["base_uri"]
# Get the messages from the consumer
"Accept" : "application/vnd.kafka.binary.v1 + json"
# Request messages for the instance on the Topic
r = requests.get(base_uri + "/topics/my_toopic", headers = headers, timeout =20)
sys.exit("Error thrown while getting message")
# Output all messages
for message in r.json():
if message["key"] is not None:
print "Message Key:" + base64.b64decode(message["key"])
print "Message Value:" + base64.b64decode(message["value"])
# When we're done, delete the consumer
"Accept" : "application/vnd.kafka.v1+json"
r = requests.delete(base_uri, headers=headers)
if r.status_code != 204:
s2j5cfk02#
我猜您的问题更多的是关于如何编写“服务器”restapi接口,而不是客户端(客户端最后只是发出http请求)。例如,可以使用strimzihttp桥(https://github.com/strimzi/strimzi-kafka-bridge)在kubernetes中,哪一个是独立工作的,哪一个是您愿意在那里部署集群的(然后您可以使用strimzi项目,https://strimzi.io/).
2条答案
按热度按时间lawou6xi1#
这是来自confluent的RESTAPI(rest代理)代码示例。我得把它打出来,所以可能有拼写错误。我希望这对你有点帮助。
(生产者使用用python编写的restapi)
(使用用python编写的restapi的使用者)
s2j5cfk02#
我猜您的问题更多的是关于如何编写“服务器”restapi接口,而不是客户端(客户端最后只是发出http请求)。例如,可以使用strimzihttp桥(https://github.com/strimzi/strimzi-kafka-bridge)在kubernetes中,哪一个是独立工作的,哪一个是您愿意在那里部署集群的(然后您可以使用strimzi项目,https://strimzi.io/).