kafka的restapi

mfuanj7w  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(478)

我需要为kafka编写一个restapi,它可以分别从消费者/生产者那里读取或写入数据。我怎么能这么做?

lawou6xi

lawou6xi1#

这是来自confluent的RESTAPI(rest代理)代码示例。我得把它打出来,所以可能有拼写错误。我希望这对你有点帮助。
(生产者使用用python编写的restapi)

  1. import requests
  2. import base64
  3. import json
  4. url = "http://restproxy:8082/topics/my_topic"
  5. headers = {
  6. "Content-Type" : "application/vnd.kafka.binary.v1 + json"
  7. }
  8. # Create one or more messages
  9. payload = {"records":
  10. [{
  11. "key":base64.b64encode("firstkey"),
  12. "value":base64.b64encode("firstvalue")
  13. }]}
  14. # Send the message
  15. r = requests.post(url, data=json.dumps(payload), headers=headers)
  16. if r.status_code != 200:
  17. print "Status Code: " + str(r.status_code)
  18. print r.text

(使用用python编写的restapi的使用者)

  1. import requests
  2. import base64
  3. import json
  4. import sys
  5. # Base URL for interacting with REST server
  6. baseurl = "http://restporxy:8082/consumers/group1"
  7. # Create the Consumer instance
  8. print "Creating consumer instance"
  9. payload {
  10. "format": "binary"
  11. }
  12. headers = {
  13. "Content-Type" : "application/vnd.kafka.v1+json"
  14. }
  15. r = requests.post(baseurl, data=json.dumps(payload), headers=headers)
  16. if r.status_code !=200:
  17. print "Status Code: " + str(r.status_code)
  18. print r.text
  19. sys.exit("Error thrown while creating consumer")
  20. # Base URI is used to identify the consumer instance
  21. base_uri = r.json()["base_uri"]
  22. # Get the messages from the consumer
  23. headers = {
  24. "Accept" : "application/vnd.kafka.binary.v1 + json"
  25. }
  26. # Request messages for the instance on the Topic
  27. r = requests.get(base_uri + "/topics/my_toopic", headers = headers, timeout =20)
  28. if r.status_code != 200:
  29. print "Status Code: " + str(r.status_code)
  30. print r.text
  31. sys.exit("Error thrown while getting message")
  32. # Output all messages
  33. for message in r.json():
  34. if message["key"] is not None:
  35. print "Message Key:" + base64.b64decode(message["key"])
  36. print "Message Value:" + base64.b64decode(message["value"])
  37. # When we're done, delete the consumer
  38. headers = {
  39. "Accept" : "application/vnd.kafka.v1+json"
  40. }
  41. r = requests.delete(base_uri, headers=headers)
  42. if r.status_code != 204:
  43. print "Status Code: " + str(r.status_code)
  44. print r.text
展开查看全部
s2j5cfk0

s2j5cfk02#

我猜您的问题更多的是关于如何编写“服务器”restapi接口,而不是客户端(客户端最后只是发出http请求)。例如,可以使用strimzihttp桥(https://github.com/strimzi/strimzi-kafka-bridge)在kubernetes中,哪一个是独立工作的,哪一个是您愿意在那里部署集群的(然后您可以使用strimzi项目,https://strimzi.io/).

相关问题