我在笔记本电脑(ubuntu17)上安装了confluent python kafka consumer,一切正常,我可以收听远程主题和接收消息。
但是当我尝试在服务器(ubuntu16)上设置它时,似乎有一个压缩问题。数据来自divolte,用lz4压缩。
当第一次连接到主题时,成功地接收到数据,没有任何错误,但在关闭并重新打开使用者后,会收到第一条消息并引发错误:
<cimpl.Message object at 0x7f089db67180>
KafkaError{code=_NOT_IMPLEMENTED,val=-170,str="Unsupported compression codec 0x3"}
我认为它不是来自divolte数据源,更像是来自kafka的消息,但我无法读取它的值,因为错误之前发生过(msg被打印出来,然后我们跳转到elif来处理错误):
c = Consumer({'bootstrap.servers': server['server'], 'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe([server['topic']])
running = True
while running:
msg = c.poll()
print(msg)
if not msg.error():
msg_value = msg.value()
print(msg_value)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
2条答案
按热度按时间zaq34kh61#
确实,这看起来像是一个版本问题,但0.10显然是没有必要的,我跟随http://docs.confluent.io/current/installation.html 重新安装存储库和汇合平台,然后将librdkafka1和librdkafka dev从0.9.1更新到0.9.5,解决了我的问题!
xzlaal3s2#
听起来这个问题应该在0.10.0或更高版本中修复
https://issues.apache.org/jira/browse/kafka-3160