我使用kafka节点客户端从kafka发布和订阅。我要取特定偏移量的特定消息。我不知道怎么可能。如果有人有办法,请帮忙。谢谢
b5lpy0ml1#
kafka是一个用于数据流处理和缓冲的消息代理,它不是一个数据库。所以随机存取数据是不可能的。有限读取策略是从特定分区读取数据。最多,您可以根据您的阅读需求设计生产者来分发数据。例如,在接收来自传感器的数据时,某人可以创建具有24个分区的主题,并根据采样时间戳发布传感器数据。现在您在特定分区中拥有每小时的数据。然而,这种策略并不适合Kafka的哲学!在使用kafka的数据时,可以根据分区的数量达到最大的数据并行性,每个分区有一个使用者。但是,当您根据采样时间戳将每个传入消息发布到分区时,一次只有一个分区在缓冲数据,而您的应用程序可以使用一个分区的数据!
j7dteeu82#
当然,一般来说这是可能的,因为低级别kafka协议的fetch请求允许指定起始偏移量。看看我不太了解的kafka节点库,我发现 addTopics 函数可以传递主题信息以及要从中开始读取的偏移量,并且需要设置 fromOffset 参数到 true 也。这就是 setOffset 方法以及这样做。遵循参考:https://github.com/sohu-co/kafka-node#consumer
addTopics
fromOffset
true
setOffset
2条答案
按热度按时间b5lpy0ml1#
kafka是一个用于数据流处理和缓冲的消息代理,它不是一个数据库。所以随机存取数据是不可能的。有限读取策略是从特定分区读取数据。最多,您可以根据您的阅读需求设计生产者来分发数据。例如,在接收来自传感器的数据时,某人可以创建具有24个分区的主题,并根据采样时间戳发布传感器数据。现在您在特定分区中拥有每小时的数据。
然而,这种策略并不适合Kafka的哲学!在使用kafka的数据时,可以根据分区的数量达到最大的数据并行性,每个分区有一个使用者。但是,当您根据采样时间戳将每个传入消息发布到分区时,一次只有一个分区在缓冲数据,而您的应用程序可以使用一个分区的数据!
j7dteeu82#
当然,一般来说这是可能的,因为低级别kafka协议的fetch请求允许指定起始偏移量。
看看我不太了解的kafka节点库,我发现
addTopics
函数可以传递主题信息以及要从中开始读取的偏移量,并且需要设置fromOffset
参数到true
也。这就是setOffset
方法以及这样做。遵循参考:https://github.com/sohu-co/kafka-node#consumer