我尝试以下代码:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
schema=avro.schema.parse(open('ff.avsc','rb').read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
...
decode(message.value)
其中message.value是我来自Kafka的消息
出现错误
NameError: name 'BinaryDecoder' is not defined
1条答案
按热度按时间qlckcl4x1#
关于你的错误,我没有看到该类的导入,所以这就解释了。。。
基于
seek(5)
,那么您似乎正在使用架构注册表编码的数据。。。这意味着您需要使用正确的库。安装
pip install confluent-kafka[avro]
####消费https://github.com/confluentinc/confluent-kafka-python#usage