import requests
import base64
import json
url = "http://restproxy:8082/topics/my_topic"
headers = {
"Content-Type" : "application/vnd.kafka.binary.v1 + json",
}
# Create one or more messages
payload = {"records":
[{
"key":base64.b64encode('firstkey'),
"value":base64.b64encode('firstvalue'),
}],
}
# Send the message
r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code != 200:
print("Status Code: " + str(r.status_code))
print(r.text)
(使用用python编写的restapi的使用者)
import requests
import base64
import json
import sys
# Base URL for interacting with REST server
baseurl = "http://restproxy:8082/consumers/group1"
# Create the Consumer instance
print("Creating consumer instance")
payload {
"format": "binary",
}
headers = {
"Content-Type" : "application/vnd.kafka.v1+json",
}
r = requests.post(baseurl, data=json.dumps(payload), headers=headers)
if r.status_code !=200:
print("Status Code: " + str(r.status_code))
print(r.text)
sys.exit("Error thrown while creating consumer")
# Base URI is used to identify the consumer instance
base_uri = r.json()["base_uri"]
# Get the messages from the consumer
headers = {
"Accept" : "application/vnd.kafka.binary.v1 + json",
}
# Request messages for the instance on the Topic
r = requests.get(base_uri + "/topics/my_topic", headers = headers, timeout =20)
if r.status_code != 200:
print("Status Code: " + str(r.status_code))
print(r.text)
sys.exit("Error thrown while getting message")
# Output all messages
for message in r.json():
if message["key"] is not None:
print("Message Key:" + base64.b64decode(message["key"]))
print("Message Value:" + base64.b64decode(message["value"]))
# When we're done, delete the consumer
headers = {
"Accept" : "application/vnd.kafka.v1+json",
}
r = requests.delete(base_uri, headers=headers)
if r.status_code != 204:
print("Status Code: " + str(r.status_code))
print(r.text)
2条答案
按热度按时间zengzsys1#
最好实现生产者和消费者,然后为生产者和消费者集成restapi。
rest api:
--不然呢
尝试利用融合实现的restapi
http://docs.confluent.io/1.0/kafka-rest/docs/intro.html
holgip5t2#
这是来自confluent的RESTAPI(rest代理)代码示例。不幸的是,不是用java而是用python(
我得把它打出来,所以可能有拼写错误。我希望这对你有点帮助。
(生产者使用用python编写的restapi)
(使用用python编写的restapi的使用者)