java—如何使用kafka python生成的来自kafka的消息?

kmbjn2e3  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(681)

我想得到Kafka的帮助。
是否可以使用utf8(日语)消息 kafka-console-consumer.sh 哪个是Kafka·python制作的?
Kafkapython代码

self._client = KafkaProducer(\
                    bootstrap_servers=bootstrap_servers,\
                    api_version=api_version,\
                    value_serializer=lambda m: json.dumps(m).encode('utf-8'))

for message in data:
    self._client.send(topic, message)

kafka-cosole-consumer.sh

./kafka-console-consumer.sh --bootstrap-server msk.amazonaws.com:9092 --topic meetings --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.BytesDeserializer

结果

"uuid": "d/U55wRdSj60yJBcYVpt8A==", "id": 4459052115, "topic": "\x5Cu6e21\x5Cu8fba \x5Cu88d5\x5Cu306e\x5Cu30d1\x5Cu30fc\x5Cu30bd\x5Cu30ca\x5Cu30eb\x5Cu30df\x5Cu30fc\x5Cu30c6\x5Cu30a3\x5Cu30f3\x5Cu30b0\x5Cu30eb\x5Cu30fc\x5Cu30e0", "host": "\x5Cu6e21\x5Cu8fba \x5Cu88d5", "email": "y-watanabe@creationline.com", "user_type": "\x5Cu30e9\x5Cu30a4\x5Cu30bb\x5Cu30f3\x5Cu30b9\x5Cu6e08\x5Cu307f", "start_time": "2020-11-03T10:36:29Z", "end_time": "", "duration": "", "participants": 1, "has_pstn": false, "has_voip": false, "has_3rd_party_audio": false, "has_video": false, "has_screen_share": false, "has_recording": false, "has_sip": false, "dept": ""}

似乎日语字符串(utf-8)未正确反序列化。它被破坏了。
我正试图通过下面的路线传递信息。

producer(kafka-python) -> AWS MSK (2.4.1.1) -> consumer(kafka-console-consumer.sh)
zi8p0yeb

zi8p0yeb1#

我有两件事我没有正确理解。
bytedeserializer在java中将字节对象转换为字节类型。正如@codeflush.dev所说,我看到了正确的行为。
将字节数组中的记录值反序列化为值或对象。
我忽略了json.dumps转义非ascii字符的事实。这是由ascii选项控制的。
如果确保\u ascii为true(默认值),则输出将保证对所有传入的非ascii字符进行转义。如果ascii为false,这些字符将按原样输出。
通过将ascii设置为 False 然后转换成字节,解决了这个问题。

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m, ensure_ascii=False).encode('utf-8'))


将json.dumps中的utf-8文本保存为utf8,而不是转义序列

相关问题