我用的是spotify docker版的Kafka。我现在只有一个生产者,一个经纪人和一个消费者。
问题是,使用这种配置,我无法从Kafka获得最新消息。如果我默认 auto_offset_reset='earliest'
然后我收到了所有的信息。
这是我给消费者的代码,即使Kafka上有新的信息,它也不会打印任何信息。
def consumer():
consumer = KafkaConsumer('stories',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('ascii')),
bootstrap_servers=['127.0.0.1:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000
)
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
有人能帮我一下,告诉我哪里出了问题吗?
我只是想从Kafka中得到消费者没有读到的信息。
以下是Kafka消费者:https://hastebin.com/umuhugacez.py
Kafka制作人:https://hastebin.com/laderolawi.py
2条答案
按热度按时间a8jjtwal1#
我也面临这个问题。无论在consumer中设置了什么,当我尝试进行视频流时,它总是用来缓冲数据。我的情况是解决了寻找抵消结束后,定义消费者。你可以尝试添加
8zzbczxx2#
只是一个更新:从kafka 0.9开始,kafka正在使用一个新的java版本的consumer,auto.offset.reset参数名已经更改;根据手册:
自动\u偏移\u重置在您的代码中是错误的,应该是最早的。
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
其他:向消费者抛出异常。