Nacos源码分析二十四、Raft选举过程(2)

x33g5p2x  于2021-12-20 转载在 其他  
字(18.1k)|赞(0)|评价(0)|浏览(483)

MasterElection 投票

前面我们看到了MasterElection用于选举的任务,我们再看一下run方法:

@Override
public void run() {
    try {
        // ready状态
        if (!peers.isReady()) {
            return;
        }

        //获取本地结点
        RaftPeer local = peers.local();
        // 任期-500毫秒
        local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

        //任期超时没到
        if (local.leaderDueMs > 0) {
            return;
        }

        // 重置任期超时时间和心跳时间,准备开始拉票
        // reset timeout
        local.resetLeaderDue();
        local.resetHeartbeatDue();

        // 发起投票
        sendVote();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while master election {}", e);
    }
    
}

每隔500毫秒的任务周期,leaderDueMs就会-500毫秒,然后判断是否已经过期了。 当有leader的时候,会定时的发送心跳任务,每次follower收到心跳任务都会重置这个leaderDueMs时间。只有当没有leader了,此时leaderDueMs会一直减小直到小于0,某一个节点的leaderDueMs小于0了,它就会发起投票。

我们看一下sendVote方法:

private void sendVote() {
    
    RaftPeer local = peers.get(NetUtils.localServer());
    Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
            local.term);
    // 所有的peer的voteFor设置成null
    peers.reset();

    // 任期+1
    local.term.incrementAndGet();
    // 为自己投票
    local.voteFor = local.ip;
    // 设置为后选者
    local.state = RaftPeer.State.CANDIDATE;
    
    Map<String, String> params = new HashMap<>(1);
    params.put("vote", JacksonUtils.toJson(local));
    // 发送投票请求,异步等待响应  除自己外的节点
    for (final String server : peers.allServersWithoutMySelf()) {
        // /v1/ns/raft/vote
        final String url = buildUrl(server, API_VOTE);
        try {
            HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.RAFT
                                .error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
                        return 1;
                    }
                    //解析其他结点的信息
                    RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class);
                    
                    Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));

                    // 决定谁是leader
                    peers.decideLeader(peer);
                    
                    return 0;
                }
            });
        } catch (Exception e) {
            Loggers.RAFT.warn("error while sending vote to server: {}", server);
        }
    }
}

既然这个节点发起投票了,那么就先投自己一票,设置自己为候选者CANDIDATE,然后向集群中除了自己节点外的其他节点发起投票请求,当收到响应后来决定谁是leader。

我们看一下/v1/ns/raft/vote接口,RaftController的vote方法接收:

@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
    
    return JacksonUtils.transferToJsonNode(peer);
}

返回一个这个节点自己的peer,里面包含它投给谁的票。raftCore.receivedVote:

public synchronized RaftPeer receivedVote(RaftPeer remote) {
    // 找不到
    if (!peers.contains(remote)) {
        throw new IllegalStateException("can not find peer: " + remote.ip);
    }

    // 自己
    RaftPeer local = peers.get(NetUtils.localServer());
    // 如果待投的节点任期比自己还小,那就投自己。
    if (remote.term.get() <= local.term.get()) {
        String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
        
        Loggers.RAFT.info(msg);
        if (StringUtils.isEmpty(local.voteFor)) {
            local.voteFor = local.ip;
        }
        
        return local;
    }

    //设置任期到期时间,重新选举计时
    local.resetLeaderDue();

    //作为跟随者
    local.state = RaftPeer.State.FOLLOWER;
    // 投请求者的票
    local.voteFor = remote.ip;
    // 任期设置成请求者的
    local.term.set(remote.term.get());
    
    Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);

    // 返回自己,告诉请求者我投你
    return local;
}

取出来自己,如果对方的任期term比自己的还小,那么就投自己。否则设置成跟随者FOLLOWER,把voteFor设置成对方,最后返回。

回去看发起者收到投票后如何处理的peers.decideLeader(peer):

public RaftPeer decideLeader(RaftPeer candidate) {
    // 先更新进去
    peers.put(candidate.ip, candidate);
    
    SortedBag ips = new TreeBag();
    int maxApproveCount = 0;
    String maxApprovePeer = null;
    for (RaftPeer peer : peers.values()) {
        if (StringUtils.isEmpty(peer.voteFor)) {
            continue;
        }
        // 添加投票目标
        ips.add(peer.voteFor);
        // 更新投票最大者
        if (ips.getCount(peer.voteFor) > maxApproveCount) {
            maxApproveCount = ips.getCount(peer.voteFor);
            maxApprovePeer = peer.voteFor;
        }
    }

    // 如果投票最大者大于总数一半
    if (maxApproveCount >= majorityCount()) {
        // 对应的节点设置为leader
        RaftPeer peer = peers.get(maxApprovePeer);
        peer.state = RaftPeer.State.LEADER;

        //如果leader有改变的话就通知
        if (!Objects.equals(leader, peer)) {
            leader = peer;
            // 投票完成事件
            ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
            Loggers.RAFT.info("{} has become the LEADER", leader.ip);
        }
    }
    
    return leader;
}

首先计算得票最大的票数和对应的ip是哪个,然后看看此时最大票数是否已经大于半数了,如果超过半数了,说明投票已经成功了,那么设置对应的peer为leader。最后比较一下之前的leader和现在的leader是否是同一个,如果不是就是leader在这次投票后有变更,那么进行LeaderElectFinishedEvent事件通知。

RaftListener监听这个事件:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (event instanceof BaseRaftEvent) {
        BaseRaftEvent raftEvent = (BaseRaftEvent) event;
        RaftPeer local = raftEvent.getLocal();
        String json = JacksonUtils.toJson(local);
        Map map = JacksonUtils.toObj(json, HashMap.class);
        Member self = memberManager.getSelf();
        self.setExtendVal(GROUP, map);
        memberManager.update(self);
    }
}

就是更新serverList中对应自己那个节点的扩展属性extendInfo。

我们想一个问题,如果此时被选举出来的leader不是self自己(想想什么时候会有这种情况),那会怎么样呢?实际上这次投票就结束了,后面会把状态更新回来。而由于此时整个集群范围内还是没有leader,其他的节点同样也会因为leaderDueMs超时而再次发起选举投票,直到某个节点的选举结果leader是它self本身,那么他的心跳任务就会触发通知到别的节点上去。

HeartBeat 心跳任务

首先我们要知道的是心跳任务必须是leader才会发送。

@Override
public void run() {
    try {
        
        if (!peers.isReady()) {
            return;
        }
        
        RaftPeer local = peers.local();
        // -500毫秒
        local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
        // 没有到期,退出
        if (local.heartbeatDueMs > 0) {
            return;
        }

        //重置心跳任期
        local.resetHeartbeatDue();
        
        sendBeat();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
    }
    
}

该心跳了就发:

private void sendBeat() throws IOException, InterruptedException {
    RaftPeer local = peers.local();
    // 如果是单机或者本机不是leader,则退出
    if (ApplicationUtils.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
        return;
    }
    if (Loggers.RAFT.isDebugEnabled()) {
        Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
    }

    //重置任期
    local.resetLeaderDue();
    
    // build data
    ObjectNode packet = JacksonUtils.createEmptyJsonNode();
    //放入leader结点信息
    packet.replace("peer", JacksonUtils.transferToJsonNode(local));
    
    ArrayNode array = JacksonUtils.createEmptyArrayNode();
    
    if (switchDomain.isSendBeatOnly()) {
        Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
    }
    
    if (!switchDomain.isSendBeatOnly()) {
        //还要带数据key
        for (Datum datum : datums.values()) {
            
            ObjectNode element = JacksonUtils.createEmptyJsonNode();

            //加入不同类型的key
            if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
            } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
            }
            element.put("timestamp", datum.timestamp.get());
            
            array.add(element);
        }
    }
    //带上数据的key
    packet.replace("datums", array);
    // broadcast
    Map<String, String> params = new HashMap<String, String>(1);
    params.put("beat", JacksonUtils.toJson(packet));
    
    String content = JacksonUtils.toJson(params);
    
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPOutputStream gzip = new GZIPOutputStream(out);
    gzip.write(content.getBytes(StandardCharsets.UTF_8));
    gzip.close();
    
    byte[] compressedBytes = out.toByteArray();
    String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
    
    if (Loggers.RAFT.isDebugEnabled()) {
        Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                compressedContent.length());
    }

    //发送给除了自己以外的
    for (final String server : peers.allServersWithoutMySelf()) {
        try {
            // /v1/ns/raft/beat
            final String url = buildUrl(server, API_BEAT);
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("send beat to server " + server);
            }
            HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", response.getResponseBody(),
                                server);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                        return 1;
                    }

                    //收到响应就添加RaftPeer,更新节点信息
                    peers.update(JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class));
                    if (Loggers.RAFT.isDebugEnabled()) {
                        Loggers.RAFT.debug("receive beat response from: {}", url);
                    }
                    return 0;
                }
                
                @Override
                public void onThrowable(Throwable t) {
                    Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }
            });
        } catch (Exception e) {
            Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
            MetricsMonitor.getLeaderSendBeatFailedException().increment();
        }
    }
    
}

首先必须是leader才会发送心跳,发送的数据包中放入了当前这个leader的信息。另外,如果SendBeatOnly=false,则会带上数据包一起发送。

然后向除了自己以外的其他节点发送/v1/ns/raft/beat消息,收到响应后就更新对应follower的peer信息:peers.update(JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class))

public RaftPeer update(RaftPeer peer) {
    peers.put(peer.ip, peer);
    return peer;
}

就是把新的放进去。

然后我们看一下/v1/ns/raft/beat消息的接收端,RaftController的beat方法:

@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
    String value = URLDecoder.decode(entity, "UTF-8");
    value = URLDecoder.decode(value, "UTF-8");
    
    JsonNode json = JacksonUtils.toObj(value);
    
    RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
    
    return JacksonUtils.transferToJsonNode(peer);
}

raftCore.receivedBeat处理心跳,这个方法比较长,我们分两部分分析:

  1. 更新leader
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
    // 本地的
    final RaftPeer local = peers.local();
    // 远程的
    final RaftPeer remote = new RaftPeer();
    JsonNode peer = beat.get("peer");
    remote.ip = peer.get("ip").asText();
    remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
    remote.term.set(peer.get("term").asLong());
    remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
    remote.leaderDueMs = peer.get("leaderDueMs").asLong();
    remote.voteFor = peer.get("voteFor").asText();

    //如果远程不是leader,无效
    if (remote.state != RaftPeer.State.LEADER) {
        Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
                JacksonUtils.toJson(remote));
        throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
    }

    // 如果本地大于远程的,心跳信息过期了
    if (local.term.get() > remote.term.get()) {
        Loggers.RAFT
                .info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
                        remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
        throw new IllegalArgumentException(
                "out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
    }

    // 本地不是跟随者,可能是候选人,所以将自己变为跟随者,选远程的leader
    if (local.state != RaftPeer.State.FOLLOWER) {
        
        Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
        // mk follower
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
    }

    //数据key集合
    final JsonNode beatDatums = beat.get("datums");
    // 本地更新任期
    local.resetLeaderDue();
    local.resetHeartbeatDue();

    //设置leader
    peers.makeLeader(remote);
    ...
    return local;
}

主要是local的数据更新,重置LeaderDue和HeartbeatDue,最后makeLeader设置leader:

public RaftPeer makeLeader(RaftPeer candidate) {
    //leader改变了
    if (!Objects.equals(leader, candidate)) {
        //设置leader
        leader = candidate;
        //通知
        ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local()));
        Loggers.RAFT
                .info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()),
                        JacksonUtils.toJson(leader));
    }
    
    for (final RaftPeer peer : peers.values()) {
        Map<String, String> params = new HashMap<>(1);
        //如果存在其他以前的leader
        if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
            try {
                //请求以前过期的leader的信息
                // /v1/ns/raft/peer
                String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER);
                HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            Loggers.RAFT
                                    .error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(),
                                            peer.ip);
                            // 更新为follower
                            peer.state = RaftPeer.State.FOLLOWER;
                            return 1;
                        }

                        //更新
                        update(JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class));
                        
                        return 0;
                    }
                });
            } catch (Exception e) {
                peer.state = RaftPeer.State.FOLLOWER;
                Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
            }
        }
    }
    //更新
    return update(candidate);
}

如果leader变了,发起MakeLeaderEvent事件通知。

然后检查peers中是否有和这个待选的leader不一样的leader,如果有,则发起/v1/ns/raft/peer请求给它,然后把它的状态改成follower,再更新peers中它的数据。 – 前面提到的那种场景,投票后结果不是自己的情况,这里会把相关的状态调回去。

最后把待选的leader的信息页更新一下,至此这个新的leader在这个follower这边已经同步过来了。

我们再看一下/v1/ns/raft/peer接收方RaftController的getPeer方法:

@GetMapping("/peer")
public JsonNode getPeer(HttpServletRequest request, HttpServletResponse response) {
    List<RaftPeer> peers = raftCore.getPeers();
    RaftPeer peer = null;
    
    for (RaftPeer peer1 : peers) {
        if (StringUtils.equals(peer1.ip, NetUtils.localServer())) {
            peer = peer1;
        }
    }
    
    if (peer == null) {
        peer = new RaftPeer();
        peer.ip = NetUtils.localServer();
    }
    
    return JacksonUtils.transferToJsonNode(peer);
}

就是把这个节点当前的最新状态peer数据拿回来。

  1. leader的数据包同步
if (!switchDomain.isSendBeatOnly()) {
    
    Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());

    //放入所有的要更新服务信息的key
    for (Map.Entry<String, Datum> entry : datums.entrySet()) {
        receivedKeysMap.put(entry.getKey(), 0);
    }
    
    // now check datums
    List<String> batch = new ArrayList<>();
    
    int processedCount = 0;
    if (Loggers.RAFT.isDebugEnabled()) {
        Loggers.RAFT
                .debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                        beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
    }
    //遍历所有带来的数据的key
    for (Object object : beatDatums) {
        processedCount = processedCount + 1;
        
        JsonNode entry = (JsonNode) object;
        String key = entry.get("key").asText();
        final String datumKey;
        
        if (KeyBuilder.matchServiceMetaKey(key)) {
            datumKey = KeyBuilder.detailServiceMetaKey(key);
        } else if (KeyBuilder.matchInstanceListKey(key)) {
            datumKey = KeyBuilder.detailInstanceListkey(key);
        } else {
            // ignore corrupted key:
            continue;
        }
        
        long timestamp = entry.get("timestamp").asLong();
        
        receivedKeysMap.put(datumKey, 1);
        
        try {
            if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
                    && processedCount < beatDatums.size()) {
                continue;
            }
            
            if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                batch.add(datumKey);
            }
            
            if (batch.size() < 50 && processedCount < beatDatums.size()) {
                continue;
            }
            
            String keys = StringUtils.join(batch, ",");
            
            if (batch.size() <= 0) {
                continue;
            }
            
            Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
                            + ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
                    processedCount, beatDatums.size(), datums.size());
            
            // update datum entry
            String url = buildUrl(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
            HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        return 1;
                    }
                    
                    List<JsonNode> datumList = JacksonUtils
                            .toObj(response.getResponseBody(), new TypeReference<List<JsonNode>>() {
                            });
                    
                    for (JsonNode datumJson : datumList) {
                        Datum newDatum = null;
                        OPERATE_LOCK.lock();
                        try {
                            
                            Datum oldDatum = getDatum(datumJson.get("key").asText());
                            
                            if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
                                    .get()) {
                                Loggers.RAFT
                                        .info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                                datumJson.get("key").asText(),
                                                datumJson.get("timestamp").asLong(), oldDatum.timestamp);
                                continue;
                            }
                            
                            if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
                                Datum<Service> serviceDatum = new Datum<>();
                                serviceDatum.key = datumJson.get("key").asText();
                                serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                serviceDatum.value = JacksonUtils
                                        .toObj(datumJson.get("value").toString(), Service.class);
                                newDatum = serviceDatum;
                            }
                            
                            if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
                                Datum<Instances> instancesDatum = new Datum<>();
                                instancesDatum.key = datumJson.get("key").asText();
                                instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                instancesDatum.value = JacksonUtils
                                        .toObj(datumJson.get("value").toString(), Instances.class);
                                newDatum = instancesDatum;
                            }
                            
                            if (newDatum == null || newDatum.value == null) {
                                Loggers.RAFT.error("receive null datum: {}", datumJson);
                                continue;
                            }
                            
                            raftStore.write(newDatum);
                            
                            datums.put(newDatum.key, newDatum);
                            notifier.addTask(newDatum.key, ApplyAction.CHANGE);
                            
                            local.resetLeaderDue();
                            
                            if (local.term.get() + 100 > remote.term.get()) {
                                getLeader().term.set(remote.term.get());
                                local.term.set(getLeader().term.get());
                            } else {
                                local.term.addAndGet(100);
                            }
                            
                            raftStore.updateTerm(local.term.get());
                            
                            Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                    newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
                            
                        } catch (Throwable e) {
                            Loggers.RAFT
                                    .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
                                            e);
                        } finally {
                            OPERATE_LOCK.unlock();
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(200);
                    return 0;
                }
            });
            
            batch.clear();
            
        } catch (Exception e) {
            Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
        }
        
    }
    
    List<String> deadKeys = new ArrayList<>();
    for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
        if (entry.getValue() == 0) {
            deadKeys.add(entry.getKey());
        }
    }
    
    for (String deadKey : deadKeys) {
        try {
            deleteDatum(deadKey);
        } catch (Exception e) {
            Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
        }
    }
    
}

蛮长的一段,其实就是根据leader带来的数据key,进行比对,要更新的就批量更新,收到后区分好Service还是InstanceList的进行更新,通知,并且更新任期状态。

其中有一个远程接口调用/v1/ns//raft/datum,实际上就是去这个新的leader去拿具体的数据,我们看一下它的实现RaftController的get方法:

@GetMapping("/datum")
public String get(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
    response.setHeader("Cache-Control", "no-cache");
    response.setHeader("Content-Encode", "gzip");
    String keysString = WebUtils.required(request, "keys");
    keysString = URLDecoder.decode(keysString, "UTF-8");
    String[] keys = keysString.split(",");
    List<Datum> datums = new ArrayList<Datum>();

    // keys中所有的数据信息传回去,重新选举过程中有不一致的数据情况下
    for (String key : keys) {
        Datum datum = raftCore.getDatum(key);
        datums.add(datum);
    }
    
    return JacksonUtils.toJson(datums);
}

总结

至此整个Raft选举过程我们分析完了。中间还有很多细节,大家可以对照着一点点看。这里简单画个流程图:

相关文章