无法读取来自kafka主题的最新消息

b5lpy0ml  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(363)

我用的是spotify docker版的Kafka。我现在只有一个生产者,一个经纪人和一个消费者。
问题是,使用这种配置,我无法从Kafka获得最新消息。如果我默认 auto_offset_reset='earliest' 然后我收到了所有的信息。
这是我给消费者的代码,即使Kafka上有新的信息,它也不会打印任何信息。

def consumer():
consumer = KafkaConsumer('stories',
                         enable_auto_commit=False,
                         value_deserializer=lambda m: json.loads(m.decode('ascii')),
                         bootstrap_servers=['127.0.0.1:9092'],
                         auto_offset_reset='latest',
                         consumer_timeout_ms=1000
                         )

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

有人能帮我一下,告诉我哪里出了问题吗?
我只是想从Kafka中得到消费者没有读到的信息。
以下是Kafka消费者:https://hastebin.com/umuhugacez.py
Kafka制作人:https://hastebin.com/laderolawi.py

a8jjtwal

a8jjtwal1#

我也面临这个问题。无论在consumer中设置了什么,当我尝试进行视频流时,它总是用来缓冲数据。我的情况是解决了寻找抵消结束后,定义消费者。你可以尝试添加

consumer.poll() 
consumer.seek_to_end()
8zzbczxx

8zzbczxx2#

只是一个更新:从kafka 0.9开始,kafka正在使用一个新的java版本的consumer,auto.offset.reset参数名已经更改;根据手册:
自动\u偏移\u重置在您的代码中是错误的,应该是最早的。
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
其他:向消费者抛出异常。

相关问题