为来自不同组的kafka消费者配置相同的偏移量

ryevplcw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(439)

我有servicea,它生成domainchangeevents并将它们提交到kafka中的topic中,然后serviceb使用kafka topic中的这些事件并将更改应用到存储在内存中的读取模型。一些domainchangeevent是重置事件,这些事件将域重置为起始点。重新启动serviceb时,我想从上次重置读取changeevents,然后重新构建域。
serviceb作为复制服务在docker中午餐。
因为我想要serviceb的每个副本中的所有changeevents,所以我不能给它们相同的group.id,否则消息将被负载平衡,并且我不会得到所有副本中的所有事件。如何将serviceb配置为在重新启动后从最新的重置事件继续?
我尝试在serviceb上设置random group.id,并在使用reset消息后提交它,但在重新启动后,我有不同的group.id,因此所有消息都从一开始就被再次使用。
考虑给docker副本提供不同的配置,但据我所知,docker服务在所有副本中都配置为相同,这不是一个选项。

yacmzcpb

yacmzcpb1#

一个可能的解决方案是,通过手动将偏移量提交到数据库,存储您希望不同使用者从中开始的点。
一张table看起来像:

Topic  Partition  Offset

topicA 0          112
topicA 1          125
topicB 0          2313
topicB 1          2984
topicB 2          2554

这些将是你的“最新重置”点,或立场,你的消费者想要开始。问题在于 subscribe() 方法,正如您所说的,是它依赖于group.id参数,并执行消费者再平衡和协调游戏。
为了从固定点(或不同分区中的一组点)进行消费,应该调用 assign() 相反。使用此方法,您可以手动为使用者指定分区列表。没有group.id,没有动态分区分配,也没有偏移量加载,这正是您需要的。
分配分区后,应该调用 seek() . 使用seek,您可以告诉使用者要从哪个偏移量开始读取在上指定的分区 assign() 方法。
例如,要开始阅读任何主题的“最新重置”,您应该执行以下操作:

//seeking the last offset of topicA's partition0
public void setStartPosition(TopicPartition partition, long offset) 
{
     consumer.assign(Collections.singletonList(partition)); //f.e-> partition0
     consumer.seek(partition, offset);                      //f.e -> 112
}

调用此方法将使使用者准确地定位在每个分区中所需的位置。我不确定我是否回答了你的问题,但希望能有所帮助!

相关问题