使用python从kafka中最后消耗的偏移量读取数据

pjngdqdw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(320)

我正在使用合流python-kafka库来使用来自kafka的消息。有没有一种方法可以开始使用上一次使用的消息?假设我在一个主题中有10条消息:*我在一段时间后消费了这10条消息,然后我又在该主题中写了几条消息。我想使用这些信息,但第一手跳过前10条信息,以此类推。有可能吗?
我试过将“auto.offset.reset”设置为“最大”,但这会使我从那一点开始消耗。它不会使用“未读”消息。
也就是说,如果我在一个主题中写10条消息,然后尝试在激活该设置的情况下使用它们,它将只侦听传入的(新的)消息。
这是用于轮询消息的函数:

In [50]: def get_messages(topics=None):
...:     try:                                                                                                                                                                                                                                                       
...:         c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'xyz_group', 'default.topic.config': {'enable.auto.commit': 'true'}})
...:         if topics is not None:
...:             c.subscribe(topics)
...:     except KafkaException as e:
...:         print('We got an exception: {}'.format(e))
...:     else:
...:         running = True
...:         while running:
...:             msg = c.poll()
...:             if not msg.error():
...:                 msg_payload = msg.value().decode('utf-8')
...:                 print('Received: {}'.format(msg_payload))
...:                 msg_data = json.loads(msg_payload)
...:                 for k, v in msg_data.iteritems():
...:                     if k == 'signal' and v == 'stop':
...:                         running = False 
...:                         #raise SystemExit('we got stop signal !')
...:                     else:                         
...:                         print('continue listening ...')
...:             elif msg.error().code() != KafkaError._PARTITION_EOF:
...:                 print(msg.error())     
...:                 running = False                                  
...:     finally:             
...:         c.close()
mv1qrgav

mv1qrgav1#

您描述的是默认行为,即启用自动提交( enable.auto.commit=true ). 最后消耗的消息的偏移量将以频繁的间隔提交( auto.commit.interval.ms ).
正确关闭耗电元件时(通过呼叫 close() )它将提交它的最终补偿。在重新启动客户端时,它将从它停止的位置重新开始,并使用比上次使用的消息更新的消息。

相关问题