我正在使用合流python-kafka库来使用来自kafka的消息。有没有一种方法可以开始使用上一次使用的消息?假设我在一个主题中有10条消息:*我在一段时间后消费了这10条消息,然后我又在该主题中写了几条消息。我想使用这些信息,但第一手跳过前10条信息,以此类推。有可能吗?
我试过将“auto.offset.reset”设置为“最大”,但这会使我从那一点开始消耗。它不会使用“未读”消息。
也就是说,如果我在一个主题中写10条消息,然后尝试在激活该设置的情况下使用它们,它将只侦听传入的(新的)消息。
这是用于轮询消息的函数:
In [50]: def get_messages(topics=None):
...: try:
...: c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'xyz_group', 'default.topic.config': {'enable.auto.commit': 'true'}})
...: if topics is not None:
...: c.subscribe(topics)
...: except KafkaException as e:
...: print('We got an exception: {}'.format(e))
...: else:
...: running = True
...: while running:
...: msg = c.poll()
...: if not msg.error():
...: msg_payload = msg.value().decode('utf-8')
...: print('Received: {}'.format(msg_payload))
...: msg_data = json.loads(msg_payload)
...: for k, v in msg_data.iteritems():
...: if k == 'signal' and v == 'stop':
...: running = False
...: #raise SystemExit('we got stop signal !')
...: else:
...: print('continue listening ...')
...: elif msg.error().code() != KafkaError._PARTITION_EOF:
...: print(msg.error())
...: running = False
...: finally:
...: c.close()
1条答案
按热度按时间mv1qrgav1#
您描述的是默认行为,即启用自动提交(
enable.auto.commit=true
). 最后消耗的消息的偏移量将以频繁的间隔提交(auto.commit.interval.ms
).正确关闭耗电元件时(通过呼叫
close()
)它将提交它的最终补偿。在重新启动客户端时,它将从它停止的位置重新开始,并使用比上次使用的消息更新的消息。