现在我的功能是将几百条消息写入kafka队列。但是,当所有这些消息都被使用后,我还需要执行额外的功能。有没有办法将侦听器放在kafka队列中,以便在队列被清空时得到通知?
lb3vh1jj1#
你可以用两种方法来解决这个问题,我认为:kafka的fetch响应包含 HighwaterMarkOffset ,实际上是分区中最后一条消息的偏移量。你可以检查你的信息是否有偏移量,如果有-你已经到了结尾。但是,如果生产者和消费者同时工作,这将不起作用-消费者可以更快地消费消息,从而比您需要的更早停止。发送“毒丸”信息-假设你需要生成100条信息。然后您的生产者发送这100条消息+1条特殊消息(例如某些uuid,但确保它在您的逻辑中的正常情况下永远不会出现),这意味着“结束”。在用户端,您将检查收到的消息是否是毒丸,如果是,则关闭。
HighwaterMarkOffset
1条答案
按热度按时间lb3vh1jj1#
你可以用两种方法来解决这个问题,我认为:
kafka的fetch响应包含
HighwaterMarkOffset
,实际上是分区中最后一条消息的偏移量。你可以检查你的信息是否有偏移量,如果有-你已经到了结尾。但是,如果生产者和消费者同时工作,这将不起作用-消费者可以更快地消费消息,从而比您需要的更早停止。发送“毒丸”信息-假设你需要生成100条信息。然后您的生产者发送这100条消息+1条特殊消息(例如某些uuid,但确保它在您的逻辑中的正常情况下永远不会出现),这意味着“结束”。在用户端,您将检查收到的消息是否是毒丸,如果是,则关闭。