我有一个简单的json对象,如下所示
d = { 'tag ': 'blah',
'name' : 'sam',
'score':
{'row1': 100,
'row2': 200
}
}
下面是我的python代码,它向kafka发送消息
from kafka import SimpleProducer, KafkaClient
import json
# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)
我在storm日志中看到消息正在被接收,但是它为元组{json structure in here}抛出的转换为空,不确定需要做什么来修复这个问题?。。
2条答案
按热度按时间7nbnzgx91#
Kafka期望值以字节为单位
这里是我的简单Kafka生产者发送消息到Kafka服务器。
这里,json.dumps()将json转换为字符串,encode('utf-8')将字符串转换为字节数组。
ohfgkhjo2#
下面是我为Kafka制作的代码。我唯一不同的做法就是
yaml.safe_load
加载json内容。它以字符串而不是unicode返回内容。下面是代码段这里的每一行都是存储在文件中的json数据。