我正在开发一个使用kafka streams api的服务。我想知道是否有办法确定我在消费记录方面的服务落后了多少。我想能够查询消费者滞后。
以下是一些我想要达到的目标的背景。我的服务使用streams api,它监听一个输入主题,执行一些涉及状态的处理,并输出一个输出主题的记录。
我想处理这样一个场景:我的服务崩溃,几个小时后又重新上线。在这段时间内,将有大量关于输入主题的积压记录。
一旦它重新联机,服务将开始消耗输入主题中的所有累积记录,并输出输出主题中的大量记录。
我希望能够检测到这样一个事实,即我的服务有一个巨大的消费者滞后,如果是这样的话,就暂停它的输出。也就是说,我希望我的服务消耗所有累积的输入记录,直到它赶上接近实时的速度,然后才应该开始输出消息。
到目前为止我找到的最好的方法就是 ConsumerInterceptor
. ConsumerInterceptor.onConsume()
方法将在每次读取记录时调用:
ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)
从 ConsumerRecords
,然后我可以得到记录的时间戳。如果时间戳远远落后于当前时间,那么我将暂停消息的输出。
与其基于记录中的时间戳,不如通过某种方式查询消费者延迟。
也许我不能质疑消费者滞后,因为这违背了Kafka的设计原则。如果有人有任何建议或我应该如何处理我的问题,那么请让我知道。
顺便说一下,我的服务没有使用更高级别的kafka streams dsl api,而是使用较低级别的处理器api。
谢谢你抽出时间。
暂无答案!
目前还没有任何答案,快来回答吧!