获得最新偏移量的最简单方法

hts6caw3  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我正在构建一个应用程序,允许动态添加和删除对Kafka主题的订阅。添加主题订阅后,我希望每小时运行一次批处理作业,以获取所有新消息并将它们推送到另一个数据存储中。
我想了解的是如何获得主题的当前偏移量。添加订阅后,我希望下一个批处理作业获得自订阅的大致时间以来的所有消息。
举个例子,假设我有一个名为“topica”的主题,它不断地接收消息。如果我在晚上7点15分添加订阅,当批处理作业在晚上8点运行时,我希望从晚上7点15分开始的所有消息都被批处理。我很高兴现在的时间大约是7点10分,7点20分等等。双方各有5到10分钟的时间都不会让我担心。
因此,我的解决方案是在添加订阅时获取主题的当前偏移量。我已经研究了简单的使用者,但是我不想涉及这个基本用例的所有集群管理方面。
我还研究了高级消费者。我可以这样做:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset

这种方法让我担心的是,对“head”的调用实际上是一个流。所以我相信它会阻止等待下一条消息。阻塞是有问题的,因为它可能会导致其他订阅排队,直到下一条消息到达。
我很乐意花一些时间实现后一种方法,但是如果有一种更简单的方法不需要我编写容易出错的并发代码,那么我宁愿不要浪费时间。
我还需要一个方法来获得所有日志,因为偏移量。

6uxekuva

6uxekuva1#

对fetch请求的每个响应都返回一个“highwatermark”,它表示当前正在使用的分区的日志中的最新偏移量。因此,理论上,您可以获取给定主题的最早消息或任何消息(假设存在),并从响应中提取highwatermark。这里有关于高水位线的更多细节:https://cwiki.apache.org/confluence/display/kafka/a+guide+to+the+kafka+protocol#aguidetothekafkaprotocol-获取响应
当然,能否从响应中提取highwatermarkoffset取决于您的客户机是否能够通过自己的kafkaapi提供这些数据。

相关问题