Kafka Python 中 Protobuf 对象 的 反 序列 化

mw3dktmi  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(186)

我已经在Kafka生产者中传递了一个Protobuf对象,并且在消费者端接收到一个字节数组。现在我想再次将该响应反序列化回Protobuf对象,但是我无法做到这一点。我该如何做到呢?
这是我的消费者:

from email import message
from kafka import KafkaConsumer
import json
from simple import simple_message
import check_pb2

consumer = KafkaConsumer('latest', api_version=(0, 10, 1),
                         group_id='my-group', enable_auto_commit=False,
                         bootstrap_servers=['localhost:9092'])#,
           #value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
    print(message)

我尝试使用Confluent,但
Kafka Exception : {} KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="Unknown magic byte. This message was not produced with a Confluent Schema Registry serializer"

nhaq1z21

nhaq1z211#

如果您没有使用Confluent Schema Registry,请使用ParseFromString

for message in consumer:
    value = ParseFromString(message.value)  # example 
    print(value)

如果(且仅当)您正在使用Schema Registry,则Confluent有一个Protobuf反序列化器类。请参阅example,它恰好也在内部使用ParseFromString

相关问题