我如何开始实时而不是最新偏移量地使用来自Kafka的消息

hrysbysz  于 11个月前  发布在  Apache
关注(0)|答案(1)|浏览(113)

我正在使用Kafka和Debezium从数据库中捕获行相关事件,它按预期工作
在Python中,我设置了一个消费者来处理来自主题的JSON消息

# Kafka consumer configuration
config = {
    'bootstrap.servers': 'my-ip:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'latest'  
}

# Create the Consumer instance
consumer = Consumer(config)

# Subscribe to the topic
topic = 'my-ip.Database.Table'
consumer.subscribe([topic])

字符串
它工作正常,问题是我需要这个消费者只关心最新的消息
我可以根据时间戳处理每条消息并丢弃内容,但我宁愿从正确的偏移量开始消费,以开始
这是我用来处理消息的循环

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            print("No new messages.")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print("End of partition reached.")
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        message = json.loads(msg.value().decode("utf-8"))
        payload = message.get("payload", {})
        operation = payload.get("op")  # c: create/insert, u: update, d: delete

        match operation:
            case "c":
                print("Processing insert operation.")
                process_message(payload["after"], "Insert")
            case "u":
                print("Processing update operation.")
                process_message(payload["after"], "Update")
            case "d":
                print("Delete operation detected, not processing.")
            case _:
                print(f"Unhandled operation type: {operation}")

except KeyboardInterrupt:
    print("Exiting consumer...")
finally:
    consumer.close()
    print("Consumer closed.")


我如何将偏移量从我的消费者代码移到末尾?谢谢!

brgchamk

brgchamk1#

当你创建一个消费者的时候,你已经设置了'auto.offset.reset': 'latest',它默认为latest。
如果您想在处理时或在轮询之前跳到主题的末尾(而不是使用提交的偏移量),请使用使用者的seek函数

相关问题