如何从python客户端向kafka发送json对象

x6h2sr28  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(828)

我有一个简单的json对象,如下所示

  1. d = { 'tag ': 'blah',
  2. 'name' : 'sam',
  3. 'score':
  4. {'row1': 100,
  5. 'row2': 200
  6. }
  7. }

下面是我的python代码,它向kafka发送消息

  1. from kafka import SimpleProducer, KafkaClient
  2. import json
  3. # To send messages synchronously
  4. kafka = KafkaClient('10.20.30.12:9092')
  5. producer = SimpleProducer(kafka)
  6. jd = json.dumps(d)
  7. producer.send_messages(b'message1',jd)

我在storm日志中看到消息正在被接收,但是它为元组{json structure in here}抛出的转换为空,不确定需要做什么来修复这个问题?。。

7nbnzgx9

7nbnzgx91#

Kafka期望值以字节为单位

  1. b`some json message`

这里是我的简单Kafka生产者发送消息到Kafka服务器。

  1. import json
  2. from bson import json_util
  3. from kafka import KafkaProducer
  4. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  5. for i in range(10):
  6. data = { 'tag ': 'blah',
  7. 'name' : 'sam',
  8. 'index' : i,
  9. 'score':
  10. {'row1': 100,
  11. 'row2': 200
  12. }
  13. }
  14. producer.send('orders', json.dumps(data, default=json_util.default).encode('utf-8'))

这里,json.dumps()将json转换为字符串,encode('utf-8')将字符串转换为字节数组。

展开查看全部
ohfgkhjo

ohfgkhjo2#

下面是我为Kafka制作的代码。我唯一不同的做法就是 yaml.safe_load 加载json内容。它以字符串而不是unicode返回内容。下面是代码段

  1. with open('smaller_test_prod.txt') as f:
  2. for line in f:
  3. d = yaml.safe_load(line)
  4. jd = json.dumps(d)
  5. producer.send_messages(b'zeus_metrics',jd)

这里的每一行都是存储在文件中的json数据。

相关问题