我正在使用segmentio/kafka go客户端来读取某个主题的消息。我找不到。。如何开始阅读上一条/新消息。每次启动代码时,它都从该分区的起始偏移量开始读取。
wztqucjr1#
关于使用来自kafka的消息,您需要知道的是,每个客户机都是一个客户组的一部分。kafka存储已处理的主题的每个使用者组的偏移量,以便在使用者客户端停止并重新启动时知道在何处继续。这样就可以避免多次读取相同的消息。这些信息存储在一个内部Kafka主题中,称为消费偏移量。在您的示例中,这意味着您必须确保指定使用者组(在kafkaconsumerapi中,它是配置“group.id”),并保持其不变。只有这样,您才能继续阅读最新/新邮件,而不是每次都从头开始。当然,你必须从一开始就了解这个主题。
1条答案
按热度按时间wztqucjr1#
关于使用来自kafka的消息,您需要知道的是,每个客户机都是一个客户组的一部分。kafka存储已处理的主题的每个使用者组的偏移量,以便在使用者客户端停止并重新启动时知道在何处继续。这样就可以避免多次读取相同的消息。这些信息存储在一个内部Kafka主题中,称为消费偏移量。
在您的示例中,这意味着您必须确保指定使用者组(在kafkaconsumerapi中,它是配置“group.id”),并保持其不变。只有这样,您才能继续阅读最新/新邮件,而不是每次都从头开始。当然,你必须从一开始就了解这个主题。