使用avroconsumer获取分区和主题中的消息

mnemlml8  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(334)

我希望能够阅读一个主题的特定分区中的消息,以及另一个主题中的消息,就像我使用简单的 Consumer .

self.consumer = AvroConsumer(conf)

parts = [TopicPartition('p_topic', 13),
         TopicPartition('p_topic', 14)

self.consumer.assign(parts)
self.consumer.subscribe(['test_topic'])

有些“客户机”在“p\u topic”分区中生成消息,有些(我创建的)在“test\u topic”分区中生成如下消息:

self.p.produce('test_topic', msg)

我无法将这两个与上面显示的代码集成。我在“test\u topic”中生成的消息抛出:

File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/avro/__init__.py", line 115, in poll
    decoded_value = self._serializer.decode_message(message.value())
  File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/avro/serializer/message_serializer.py", line 214, in decode_message
    raise SerializerError("message does not start with magic byte")
SerializerError

如何使用 AvroConsumer ?

9njqaruj

9njqaruj1#

根据关于“魔法字节”的错误,在这个主题中产生的任何东西都没有完成 AvroProducer .

相关问题