在数据库设计过程中,我们经常会有这样的情况下
前面我们说到一个问题,那就是一个group内部,1个parition只能被1个consumer消费
,其实看到这里我们就知道应该有这样一个组件来负责partition的分配,而且前面学习消费者组机制的时候还提到过分区的三种分配策略。
对于每一个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。也就是说1个consumer group对应一个coordinattor
下面我们有一个group有3个consumer: c0, c1, c2
其实这个过程是发送了一个GroupCoordinatorRequest
定位请求去寻找coordinator
首先,Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。
此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。
这个 Broker,就是我们要找的 Coordinator
所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,并把组成员信息以及订阅信息发给leader
其他consumer作为follower,然后由这个leader进行partition分配
leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition
一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给Coordinator,Coordinator给它返回null
follower发送 null的 SyncGroupRequest 给Coordinator,Coordinator回给它partition分配的结果。
下面这张图就是整个kafka 的源码结构,前面我们已经分析过clients
clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。
config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。
connect 目录:保存 Connect 组件的源代码。我在开篇词里提到过,Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。
core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。
streams 目录:保存 Streams 组件的源代码。Kafka Streams 是实现 Kafka 实时流处理的组件。
下面我们看一下core目录下的代码,这里我们几乎看到kafka 服务端的几乎全部代码,都在这里了,我们比较熟悉的的有controller
,我们主要看一下今天的主角coordinator
整个coordinator
下面按照功能分为了两大块,第一块就是group 也就是为我们的消费者组服务的,第二块就是transaction,主要是为分布式事务服务的。
我们看到这个group包下面总共也没几个类。
下面我们看一下整个请求的流程,我们前面学习幂等性生产
的时候说过了,客户端发起的请求,在服务端进行处理的入口是在KafkaApis
这个类里面的,客户端发起不同的请求,KafkaApis
里面就有不同的方法来处理对应的请求。
我们今天要了解的就是上面这个几个方法。
代码这里有一点需要注意的是,这里判断了这个请求是处理事务的还是消费者组的
后面就是和我们前面介绍的就一样了(groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME
方法里面计算出了partition信息,其实就是那个partition,这个方法的实现的话,大致如下
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
找到了partition之后就开始找这个partition的lead 分区了,也就是我们下面图上的第二段代码。这样我们的Coordinator节点就找到了
下面这个代码其实就比较简单了,其实就是一个consumer 注册的过程,调用了groupCoordinator.handleJoinGroup() 的方法,将我们的consumer加入到我们的消费组里面去了
同时将确定了consumer group 的leader consumer,然后返回给了客户端,也就是我们的sendResponseCallback 方法。
下面就是comsumer 同步分区的分配信息,最后返回给客户端的assignmentMap,就是分配的结果。
这个就是处理consumer 客户端提交offset 的方法了,这个代码有点长,我这里就帖几处比较重要的地方了
这里就是获取我们提交的offset 信息了
根据我们的请求,来判断offset 存在那里,老版本是存储在ZK里
最后是调用GroupCoordinator的handleCommitOffsets方法进行offset 的提交
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114899707
内容来源于网络,如有侵权,请联系作者删除!