python3在kafka中读取avro格式

ybzsozfc  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(449)

我尝试以下代码:

  1. import avro.schema
  2. from avro.datafile import DataFileReader, DataFileWriter
  3. from avro.io import DatumReader, DatumWriter
  4. schema=avro.schema.parse(open('ff.avsc','rb').read())
  5. reader = DatumReader(schema)
  6. def decode(msg_value):
  7. message_bytes = io.BytesIO(msg_value)
  8. message_bytes.seek(5)
  9. decoder = BinaryDecoder(message_bytes)
  10. event_dict = reader.read(decoder)
  11. return event_dict
  12. ...
  13. decode(message.value)

其中message.value是我来自Kafka的消息
出现错误

  1. NameError: name 'BinaryDecoder' is not defined
qlckcl4x

qlckcl4x1#

关于你的错误,我没有看到该类的导入,所以这就解释了。。。
基于 seek(5) ,那么您似乎正在使用架构注册表编码的数据。。。这意味着您需要使用正确的库。

安装 pip install confluent-kafka[avro] ####消费

https://github.com/confluentinc/confluent-kafka-python#usage

相关问题