面试官:能给我画个Zookeeper选举的图吗?

x33g5p2x  于2021-12-27 转载在 Zookeeper  
字(10.5k)|赞(0)|评价(0)|浏览(593)

一、前言

Zookeeper是一个分布式协调框架,提供分布式锁、配置项管理、服务注册与集群管理等功能。

为了保证Zookeeper的高可用,一般都会以集群的模式部署。

这个时候需要考虑各个节点的数据一致性,那么集群在启动时,需要先选举出一位Leader,再由Leader完成向其他节点的数据同步工作。

本文将是Zookeeper系列的第一篇文章,从源码角度讲述Zookeeper的选举算法。

二、准备工作

博主是在windows安装了docker desktop,使用docker-compose启动zk集群的。docker-compose.yml内容如下:

  1. version: '2.2'
  2. services:
  3. zoo1:
  4. image: zookeeper:3.4.14
  5. restart: always
  6. hostname: zoo1
  7. ports:
  8. - 2181:2181
  9. environment:
  10. ZOO_MY_ID: 1
  11. ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
  12. zoo2:
  13. image: zookeeper:3.4.14
  14. restart: always
  15. hostname: zoo2
  16. ports:
  17. - 2182:2181
  18. environment:
  19. ZOO_MY_ID: 2
  20. ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
  21. zoo3:
  22. image: zookeeper:3.4.14
  23. restart: always
  24. hostname: zoo3
  25. ports:
  26. - 2183:2181
  27. environment:
  28. ZOO_MY_ID: 3
  29. ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

其中2181是用于客户端连接的端口,这里分别映射到了主机的三个端口上

ZOO_MY_ID代表节点id,需要手动指定

ZOO_SERVERS代表集群内的节点,格式为server.{节点id}={ip}:{数据同步端口}:{集群选举端口}

到该文件所处的目录下,执行 docker-compose up -d

这样我们的zk集群就启动好了

PS:如果下载镜像太慢,可以到Docker Engine的tab页中新增一些镜像源:

内容也贴一下:

  1. "registry-mirrors": [
  2. "https://registry.docker-cn.com",
  3. "http://hub-mirror.c.163.com",
  4. "https://docker.mirrors.ustc.edu.cn"
  5. ]

三、基本概念

节点的角色

  • Leader,领导者,又称主节点。负责处理客户端的写请求,并将数据同步到各个子节点
  • Follower,跟随者,又称子节点。用于处理客户端的读请求,拥有投票权。
  • Observer,观察者。也可以用于处理客户端的读请求,但没有投票权,也不会参与选举与晋升。

如何查看节点的角色

使用 docker exec -it zk_zoo3_1 /bin/bash  进入该容器中

接着执行  ./bin/zkServer.sh status  查看当前节点的状态

可以看到,zoo3为leader角色。

可以推测出,zoo3容器肯定是第2个启动完成的。那这个推测是怎么来的?稍后进入源码中一探究竟。

节点的状态

每个节点,都会有一个状态,状态被定义在QuorumPeer#ServerState枚举类中

  1. public enum ServerState {
  2. LOOKING,
  3. FOLLOWING,
  4. LEADING,
  5. OBSERVING
  6. }

如果一个节点处于LOOKING的状态,会去检查集群中存不存在Leader。如果不存在,则进行选举,此时ZK集群无法对外提供服务。

另外的三种状态,就和节点角色相对应。

myid

前文已经说过,是节点id,手动指定,需要全局唯一。

zxid

全称为Zookeeper Transaction Id,即zk事务id。写请求到达Leader时,Leader会为该请求分配一个全局递增的事务id。

使用  docker exec 容器名 /bin/bash  进入该容器,再使用  echo stat | nc localhost 2181  查看节点的状态,

其中两个Follower的状态为:

Leader的状态为:

可以看到zxid字段

zxid是一个64位的标识,前32位表示epoch(年代,纪元的意思),后32位主键递增计数。

每一个Leader就像皇帝一样,有自己的年号,这一点和Raft协议中的term任期一致(PS:对Raft协议感兴趣的同学,可以参考我的另外一篇博客 22张图,带你入门分布式一致性算法Raft)

如果当前Leader宕机后,下一任Leader的zxid中的epoch就会+1,然后低32位变为0。

查看当前epoch,可以使用  cat /data/version-2/currentEpoch

四、源码分析

QuorumPeerMain

是zk的启动类,main方法如下:

  1. public static void main(String[] args) {
  2. QuorumPeerMain main = new QuorumPeerMain();
  3. //初始化
  4. main.initializeAndRun(args);
  5. }
  6. protected void initializeAndRun(String[] args) throws ConfigException, IOException {
  7. QuorumPeerConfig config = new QuorumPeerConfig();
  8. if (args.length == 1) {
  9. //args[0]为/conf/zoo.cfg
  10. config.parse(args[0]);
  11. }
  12. //以集群模式启动,毕竟当前servers的长度为3
  13. if (args.length == 1 && config.servers.size() > 0) {
  14. runFromConfig(config);
  15. } else {
  16. //以单机模式启动
  17. ZooKeeperServerMain.main(args);
  18. }
  19. }

initializeAndRun主要是根据读取到的配置,决定是以集群还单机模式启动。

runFromConfig

  1. public void runFromConfig(QuorumPeerConfig config) throws IOException {
  2. //QuorumPeer本身是一个Thread对象
  3. quorumPeer = getQuorumPeer();
  4. //设置选举方式、myid等一系列参数,没有就使用默认值
  5. quorumPeer.setMyid(config.getServerId());
  6. //...
  7. quorumPeer.initialize();
  8. quorumPeer.start();
  9. //等待quorumPeer执行完成
  10. quorumPeer.join();
  11. }

这里启动了quorumPeer线程,quorumPeer可以理解为集群中的节点,其重写的start方法会完成当前节点的初始化工作,并且主线程需要等待quorumPeer执行完成。

直接进入run方法中

  1. public synchronized void start() {
  2. //从磁盘加载数据到内存数据库中,例如获取zxid、epoch
  3. loadDataBase();
  4. //准备接受客户端请求
  5. cnxnFactory.start();
  6. //准备进行Leader选举的环境
  7. startLeaderElection();
  8. //这里将调用本类的run方法
  9. super.start();
  10. }

startLeaderElection

其实只是准备了进行选举的环境,选用FastLeaderElection作为Leader选举的策略。

该策略会创建一个用于维护集群各个节点之间通信的QuorumCnxManager对象,节点对外的投票,首先会放入FastLeaderElection.sendqueue中,之后由QuorumCnxManager发送到另外一个节点。如果收到其他节点的投票信息,则由QuorumCnxManager先存入FastLeaderElection.recvqueue中,再由当前节点消费。

这个时候,节点之间还没有进行相互投票。所以说,startLeaderElection只是初始化了投票环境。

QuorumPeer.run

super.start将会调用本类的run方法

  1. while (running) {
  2. switch (getPeerState()) {
  3. case LOOKING:
  4. //刚启动的节点,默认处于Looking状态
  5. try {
  6. //寻找leader,下面会细讲
  7. setCurrentVote(makeLEStrategy().lookForLeader());
  8. } catch (Exception e) {
  9. setPeerState(ServerState.LOOKING);
  10. }
  11. break;
  12. case OBSERVING:
  13. setObserver(makeObserver(logFactory));
  14. observer.observeLeader();
  15. break;
  16. case FOLLOWING:
  17. setFollower(makeFollower(logFactory));
  18. follower.followLeader();
  19. break;
  20. case LEADING:
  21. setLeader(makeLeader(logFactory));
  22. leader.lead();
  23. break;
  24. }
  25. }

run方法中是一个while循环,处于Looking状态,才会进行Leader选举。

lookForLeader

startLeaderElection选用了FastLeaderElection作为Leader选举的策略,因此这里进入FastLeaderElection的lookForLeader方法

lookForLeader方法比较复杂,分阶段去理解它。

第一阶段:节点先投票给自己

  1. //创建一个投票箱(key为myid,value为投票信息),用于汇总当前集群内的投票信息
  2. HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
  3. //保存在集群确定leader之后还收到的投票信息
  4. //即保存所有处于FOLLOWING与LEADING状态的节点发出的投票信息
  5. HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
  6. //等待其他节点投票的超时时间,默认为200毫秒
  7. int notTimeout = finalizeWait;
  8. synchronized (this) {
  9. //递增逻辑时钟,逻辑时钟可以理解为选举届数
  10. logicalclock.incrementAndGet();
  11. //在每次选举中,节点都会先投自己一票
  12. //当前方式只是更新提议,还未通知到其他节点
  13. //getInitId():myid getInitLastLoggedZxid():日志中最大的zxid getPeerEpoch():节点的epoch
  14. updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
  15. }
  16. //将当前提议广播出去
  17. sendNotifications();

第二阶段:不断获取其他节点的投票信息,直至找到Leader

分为两部分:

  • 获取不到投票信息,选择重发或者重连
  • 获取到投票信息,处理投票信息
  1. //如果当前节点处于LOOKING状态,则一直获取其他节点的投票信息,直到找到leader
  2. while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
  3. //从recieve队列中取出一个投票信息
  4. //上文我们说过,其他节点的投票信息,会先由QuorumCnxManager暂存到recvqueue中
  5. Notification n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS);
  6. //获取不到投票信息
  7. if (n == null) {
  8. //选择重发或者重连
  9. //获取到投票信息
  10. } else if (validVoter(n.sid) && validVoter(n.leader)) {
  11. //判断进行投票的节点状态
  12. switch (n.state) {
  13. case LOOKING:
  14. //......
  15. break;
  16. case OBSERVING:
  17. //Observer是没有投票权的,因此这里不做处理
  18. break;
  19. case FOLLOWING:
  20. case LEADING:
  21. //......
  22. break;
  23. default:
  24. break;
  25. }
  26. }
  27. }

获取不到投票信息

  1. //获取不到投票信息
  2. if (n == null) {
  3. //从else逻辑就可以猜出,haveDelivered方法用于判断当前节点是否和集群中的其他节点全部失联
  4. if (manager.haveDelivered()) {
  5. //获取不到投票信息,那就再次广播一次,其他节点也许会进行回应
  6. //之前的回应可能由于网络原因丢失了,因此这里重试一下
  7. sendNotifications();
  8. } else {
  9. //与集群中的所有节点建立连接
  10. manager.connectAll();
  11. }
  12. //由于获取不到投票信息,这里将超时时间扩大为两倍
  13. int tmpTimeOut = notTimeout * 2;
  14. //最长不可以超过60秒
  15. notTimeout = (Math.min(tmpTimeOut, maxNotificationInterval));
  16. }

如果能获取到投票信息,且发送投票的节点状态为LOOKING时

  1. case LOOKING:
  2. //如果推荐leader的节点的epoch大于当前逻辑时钟
  3. if (n.electionEpoch > logicalclock.get()) {
  4. //代表当前节点可能错过了几届选举,导致自己的逻辑时钟比其他节点小
  5. //那就沿用别人的逻辑时钟
  6. logicalclock.set(n.electionEpoch);
  7. //清空投票箱
  8. recvset.clear();
  9. //判断被推荐的leader与当前节点谁更适合当leader
  10. //判断的根据,是选举算法的核心,稍后会细讲
  11. if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  12. getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
  13. //被推荐的leader更适合,因此更新自己的提议
  14. updateProposal(n.leader, n.zxid, n.peerEpoch);
  15. } else {
  16. //看来还是自己更适合,推荐自己
  17. updateProposal(getInitId(),
  18. getInitLastLoggedZxid(),
  19. getPeerEpoch());
  20. }
  21. //广播提议信息
  22. sendNotifications();
  23. } else if (n.electionEpoch < logicalclock.get()) {
  24. //如果投票中的epoch小于当前节点的逻辑时钟,说明该票是无效的
  25. //退出switch,取出下一条投票消息
  26. break;
  27. //如果处于同一轮选举中,且投票中的推荐的leader更适合做leader
  28. } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  29. proposedLeader, proposedZxid, proposedEpoch)) {
  30. //更新自己的提议,并广播出去
  31. updateProposal(n.leader, n.zxid, n.peerEpoch);
  32. sendNotifications();
  33. }
  34. //将发送投票消息的节点id及它的投票信息存入recvset中
  35. recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
  36. //投票箱中推荐的leader,如果和自己推荐的leader一致,且超过节点总数的一半
  37. if (termPredicate(recvset,
  38. new Vote(proposedLeader, proposedZxid,
  39. logicalclock.get(), proposedEpoch))) {
  40. //不断取出投票信息,看leader会不会进行变动
  41. while ((n = recvqueue.poll(finalizeWait,
  42. TimeUnit.MILLISECONDS)) != null) {
  43. //如果投票中推荐的leader更适合做leader
  44. if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  45. proposedLeader, proposedZxid, proposedEpoch)) {
  46. //把该选票重新放回,说明该轮选举还没有结束
  47. recvqueue.put(n);
  48. break;
  49. }
  50. }
  51. //如果在限定时间内,没有取出任何投票信息,说明选举即将结束
  52. if (n == null) {
  53. //如果leader是自己,则设置当前状态为LEADING
  54. //如果不是,属于PARTICIPANT就设置FOLLOWING,否则设置OBSERVING
  55. self.setPeerState((proposedLeader == self.getId()) ?
  56. ServerState.LEADING : learningState());
  57. //选举收尾动作
  58. Vote endVote = new Vote(proposedLeader,
  59. proposedZxid,
  60. logicalclock.get(),
  61. proposedEpoch);
  62. //清空recvqueue
  63. leaveInstance(endVote);
  64. return endVote;
  65. }
  66. }
  67. break;

totalOrderPredicate

在totalOrderPredicate方法中,决定了谁更适合做leader,也是zk选举算法的核心

  1. protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
  2. //判断外部节点推荐的leader的权重,
  3. if (self.getQuorumVerifier().getWeight(newId) == 0) {
  4. return false;
  5. }
  6. return ((newEpoch > curEpoch) ||
  7. ((newEpoch == curEpoch) &&
  8. ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
  9. }

判断newId代表的节点(即投票信息中推荐的节点,以下先称为新节点)与当前节点更适合做leader,判断的规则如下:

  • 先比较届数,新节点的选举届数大于当前节点,则新节点更适合
  • 再比较数据新旧程度,新节点的数据新于当前节点,则新节点更适合
  • 最后比较机器id,新节点的myid大于当前节点时,则新节点

判断当前选举是否可以结束时,需要先判断推荐的leader是否大于节点总数的一半:

  1. protected boolean termPredicate(
  2. HashMap<Long, Vote> votes,
  3. Vote vote) {
  4. HashSet<Long> set = new HashSet<Long>();
  5. //搜集投票箱中和自己推荐一致的选票
  6. for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
  7. if (vote.equals(entry.getValue())) {
  8. set.add(entry.getKey());
  9. }
  10. }
  11. return self.getQuorumVerifier().containsQuorum(set);
  12. }
  13. //是否大于节点总数的一半
  14. public boolean containsQuorum(Set<Long> set){
  15. return (set.size() > half);
  16. }

如果能获取到投票信息,且发送投票的节点状态为FOLLOWING或LEADING时

  1. case FOLLOWING:
  2. case LEADING:
  3. //如果逻辑时钟一致
  4. if (n.electionEpoch == logicalclock.get()) {
  5. //存入投票箱中
  6. recvset.put(n.sid, new Vote(n.leader,
  7. n.zxid,
  8. n.electionEpoch,
  9. n.peerEpoch));
  10. //如果外部推荐的leader支持率过半且合法
  11. if (ooePredicate(recvset, outofelection, n)) {
  12. //直接退出选举,确定自己的状态
  13. self.setPeerState((n.leader == self.getId()) ?
  14. ServerState.LEADING : learningState());
  15. Vote endVote = new Vote(n.leader,
  16. n.zxid,
  17. n.electionEpoch,
  18. n.peerEpoch);
  19. leaveInstance(endVote);
  20. return endVote;
  21. }
  22. }
  23. //在加入一个Leader确定的集群中,先确认一下是否是大多数节点都追随同一个leader
  24. //在确定leader之后收到的投票信息,全部存入outofelection中
  25. //即保存所有处于FOLLOWING与LEADING状态的节点发出的投票信息
  26. outofelection.put(n.sid, new Vote(n.version,
  27. n.leader,
  28. n.zxid,
  29. n.electionEpoch,
  30. n.peerEpoch,
  31. n.state));
  32. //如果外部节点推荐的leader在outofelection支持率过半且合法
  33. //一般是在选举完成后,新加入一个节点,才会走该逻辑
  34. if (ooePredicate(outofelection, outofelection, n)) {
  35. synchronized (this) {
  36. //同步当前节点的选举届数与状态
  37. logicalclock.set(n.electionEpoch);
  38. self.setPeerState((n.leader == self.getId()) ?
  39. ServerState.LEADING : learningState());
  40. }
  41. Vote endVote = new Vote(n.leader,
  42. n.zxid,
  43. n.electionEpoch,
  44. n.peerEpoch);
  45. leaveInstance(endVote);
  46. return endVote;
  47. }
  48. break;

有两种情况会走到FOLLOWING与LEADING的case中:

  • 集群已经选举出Leader,但其他节点都未及时通知到当前节点,此时n的逻辑时钟与当前一致。
  • 集群已经选举出Leader,但后来又加入了一台机器,此时逻辑时钟大概率不一致。

以上就是处于LOOKING状态的选举流程,当选举结束后,节点的状态就会确定下来,QuorumPeer类中un方法的while循环就会按照状态进入下一个阶段。

Follower执行followLeader,Leader执行lead,Observer则执行observeLeader。

因此,如果一个节点处于选举中时,则无法对外提供服务。

五、总结

下面以3个节点构成的集群为例,简要说明一下选举过程。

3个节点名称分别为zk1、zk2与zk3,数字对应于他们的myid。

启动时期选举

按序启动这个5个节点,假设它们处于同一轮选举中,即epoch一致。

  1. 先启动zk1,先投自己1票,此时zk1获得1票,但未超过半数,无法当选Leader,状态还是处于LOOKING。
  2. 接着启动zk2后,zk2也先投自己1票。zk2广播投票结果后,zk1会发现自己的epoch、zxid都与zk2相同,但myid小于zk2,因此zk1改投zk2。此时zk1获得0票,zk2获得2票,还是没有超过半数节点,zk1与zk2依然处于LOOKING。
  3. 稍后启动zk3后,zk3也先投自己1票。zk3广播投票结果后,zk1与zk2将会改投zk3。此时zk1获得0票,zk2获得0票,zk3获得3票,超过半数节点,当选为Leader,之后将状态改为LEADING,zk1与zk2则将状态改为FOLLOWING。
  4. 然后启动zk4,zk4也是先投自己1票。通过广播后,收到其他节点的投票信息,发现事情已成定局,自己来晚了,于是直接服从多数,直接将状态改为FOLLOWING。
  5. 最后启动zk5,和zk4一样的结果,状态改为FOLLOWING。

运行时期选举

运行时间选举,指的是在启动选举完成后,当选Leader的节点宕机了,此时需要重新进行选举,在选举完成前,集群无法对外提供服务。

假设Leader3宕机,其余节点通过心跳机制感应到,将会触发新一轮选举。

下面使用(myid,zxid)的形式来表达各个节点的状态,这里假设它们的epoch是一致的,但由于同步的快慢,导致自身的zxid各不相同。

  • zk1(1,5)
  • zk2(2,6)
  • zk3(3,10)
  • zk4(4,8)
  • zk5(5,7)

这是简化后的选举图,一图胜千言:

因此选举算法的核心口诀就是:

先比epoch,不行就再比zxid,还是不行那就比myid,且满足半数以上则当选为Leader。

相关文章