我正在使用Kafka和Debezium从数据库中捕获行相关事件,它按预期工作
在Python中,我设置了一个消费者来处理来自主题的JSON消息
# Kafka consumer configuration
config = {
'bootstrap.servers': 'my-ip:9092',
'group.id': 'my-group',
'auto.offset.reset': 'latest'
}
# Create the Consumer instance
consumer = Consumer(config)
# Subscribe to the topic
topic = 'my-ip.Database.Table'
consumer.subscribe([topic])
字符串
它工作正常,问题是我需要这个消费者只关心最新的消息
我可以根据时间戳处理每条消息并丢弃内容,但我宁愿从正确的偏移量开始消费,以开始
这是我用来处理消息的循环
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
print("No new messages.")
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached.")
continue
else:
print(f"Error: {msg.error()}")
break
message = json.loads(msg.value().decode("utf-8"))
payload = message.get("payload", {})
operation = payload.get("op") # c: create/insert, u: update, d: delete
match operation:
case "c":
print("Processing insert operation.")
process_message(payload["after"], "Insert")
case "u":
print("Processing update operation.")
process_message(payload["after"], "Update")
case "d":
print("Delete operation detected, not processing.")
case _:
print(f"Unhandled operation type: {operation}")
except KeyboardInterrupt:
print("Exiting consumer...")
finally:
consumer.close()
print("Consumer closed.")
型
我如何将偏移量从我的消费者代码移到末尾?谢谢!
1条答案
按热度按时间brgchamk1#
当你创建一个消费者的时候,你已经设置了
'auto.offset.reset': 'latest'
,它默认为latest。如果您想在处理时或在轮询之前跳到主题的末尾(而不是使用提交的偏移量),请使用使用者的seek函数