Kafka休息的例子

6l7fqoea  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(456)

有没有关于生产者和消费者团体在java中使用kafkarestapi的好例子。我不寻找简单消费者或Kafka客户的生产者和消费者的例子。感谢您的帮助。

zengzsys

zengzsys1#

最好实现生产者和消费者,然后为生产者和消费者集成restapi。

producer(){
//your implementation for producer
}

consumer(){
//your implementation for consumer
}

rest api:

@POST
restProducer(){
producer();
}

@GET
restConsumer(){
consumer();
}

--不然呢
尝试利用融合实现的restapi
http://docs.confluent.io/1.0/kafka-rest/docs/intro.html

holgip5t

holgip5t2#

这是来自confluent的RESTAPI(rest代理)代码示例。不幸的是,不是用java而是用python(
我得把它打出来,所以可能有拼写错误。我希望这对你有点帮助。
(生产者使用用python编写的restapi)

import requests
import base64
import json

url = "http://restproxy:8082/topics/my_topic"
headers = {
    "Content-Type" : "application/vnd.kafka.binary.v1 + json",
}

# Create one or more messages

payload = {"records":
       [{
           "key":base64.b64encode('firstkey'),
           "value":base64.b64encode('firstvalue'),
       }],
}

# Send the message

r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code != 200:
   print("Status Code: " + str(r.status_code))
   print(r.text)

(使用用python编写的restapi的使用者)

import requests
import base64
import json
import sys

# Base URL for interacting with REST server

baseurl = "http://restproxy:8082/consumers/group1"

# Create the Consumer instance

print("Creating consumer instance")
payload {
    "format": "binary",
}
headers = {
    "Content-Type" : "application/vnd.kafka.v1+json",
}
r = requests.post(baseurl, data=json.dumps(payload), headers=headers)

if r.status_code !=200:
    print("Status Code: " + str(r.status_code))
    print(r.text)
    sys.exit("Error thrown while creating consumer")

# Base URI is used to identify the consumer instance

base_uri = r.json()["base_uri"]

# Get the messages from the consumer

headers = {
    "Accept" : "application/vnd.kafka.binary.v1 + json",
}

# Request messages for the instance on the Topic

r = requests.get(base_uri + "/topics/my_topic", headers = headers, timeout =20)

if r.status_code != 200: 
    print("Status Code: " + str(r.status_code))
    print(r.text)
    sys.exit("Error thrown while getting message")

# Output all messages

for message in r.json():
    if message["key"] is not None:
        print("Message Key:" + base64.b64decode(message["key"]))
    print("Message Value:" + base64.b64decode(message["value"]))

# When we're done, delete the consumer

headers = {
    "Accept" : "application/vnd.kafka.v1+json",
}

r = requests.delete(base_uri, headers=headers)

if r.status_code != 204: 
    print("Status Code: " + str(r.status_code))
    print(r.text)

相关问题