我有servicea,它生成domainchangeevents并将它们提交到kafka中的topic中,然后serviceb使用kafka topic中的这些事件并将更改应用到存储在内存中的读取模型。一些domainchangeevent是重置事件,这些事件将域重置为起始点。重新启动serviceb时,我想从上次重置读取changeevents,然后重新构建域。
serviceb作为复制服务在docker中午餐。
因为我想要serviceb的每个副本中的所有changeevents,所以我不能给它们相同的group.id,否则消息将被负载平衡,并且我不会得到所有副本中的所有事件。如何将serviceb配置为在重新启动后从最新的重置事件继续?
我尝试在serviceb上设置random group.id,并在使用reset消息后提交它,但在重新启动后,我有不同的group.id,因此所有消息都从一开始就被再次使用。
考虑给docker副本提供不同的配置,但据我所知,docker服务在所有副本中都配置为相同,这不是一个选项。
1条答案
按热度按时间yacmzcpb1#
一个可能的解决方案是,通过手动将偏移量提交到数据库,存储您希望不同使用者从中开始的点。
一张table看起来像:
这些将是你的“最新重置”点,或立场,你的消费者想要开始。问题在于
subscribe()
方法,正如您所说的,是它依赖于group.id参数,并执行消费者再平衡和协调游戏。为了从固定点(或不同分区中的一组点)进行消费,应该调用
assign()
相反。使用此方法,您可以手动为使用者指定分区列表。没有group.id,没有动态分区分配,也没有偏移量加载,这正是您需要的。分配分区后,应该调用
seek()
. 使用seek,您可以告诉使用者要从哪个偏移量开始读取在上指定的分区assign()
方法。例如,要开始阅读任何主题的“最新重置”,您应该执行以下操作:
调用此方法将使使用者准确地定位在每个分区中所需的位置。我不确定我是否回答了你的问题,但希望能有所帮助!