文章24 | 阅读 13634 | 点赞0
前面我们看到了MasterElection用于选举的任务,我们再看一下run方法:
@Override
public void run() {
try {
// ready状态
if (!peers.isReady()) {
return;
}
//获取本地结点
RaftPeer local = peers.local();
// 任期-500毫秒
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
//任期超时没到
if (local.leaderDueMs > 0) {
return;
}
// 重置任期超时时间和心跳时间,准备开始拉票
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
// 发起投票
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
每隔500毫秒的任务周期,leaderDueMs就会-500毫秒,然后判断是否已经过期了。 当有leader的时候,会定时的发送心跳任务,每次follower收到心跳任务都会重置这个leaderDueMs时间。只有当没有leader了,此时leaderDueMs会一直减小直到小于0,某一个节点的leaderDueMs小于0了,它就会发起投票。
我们看一下sendVote方法:
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
// 所有的peer的voteFor设置成null
peers.reset();
// 任期+1
local.term.incrementAndGet();
// 为自己投票
local.voteFor = local.ip;
// 设置为后选者
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
// 发送投票请求,异步等待响应 除自己外的节点
for (final String server : peers.allServersWithoutMySelf()) {
// /v1/ns/raft/vote
final String url = buildUrl(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
//解析其他结点的信息
RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
// 决定谁是leader
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
既然这个节点发起投票了,那么就先投自己一票,设置自己为候选者CANDIDATE,然后向集群中除了自己节点外的其他节点发起投票请求,当收到响应后来决定谁是leader。
我们看一下/v1/ns/raft/vote接口,RaftController的vote方法接收:
@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
return JacksonUtils.transferToJsonNode(peer);
}
返回一个这个节点自己的peer,里面包含它投给谁的票。raftCore.receivedVote:
public synchronized RaftPeer receivedVote(RaftPeer remote) {
// 找不到
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
// 自己
RaftPeer local = peers.get(NetUtils.localServer());
// 如果待投的节点任期比自己还小,那就投自己。
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
//设置任期到期时间,重新选举计时
local.resetLeaderDue();
//作为跟随者
local.state = RaftPeer.State.FOLLOWER;
// 投请求者的票
local.voteFor = remote.ip;
// 任期设置成请求者的
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
// 返回自己,告诉请求者我投你
return local;
}
取出来自己,如果对方的任期term比自己的还小,那么就投自己。否则设置成跟随者FOLLOWER,把voteFor设置成对方,最后返回。
回去看发起者收到投票后如何处理的peers.decideLeader(peer):
public RaftPeer decideLeader(RaftPeer candidate) {
// 先更新进去
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
// 添加投票目标
ips.add(peer.voteFor);
// 更新投票最大者
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
// 如果投票最大者大于总数一半
if (maxApproveCount >= majorityCount()) {
// 对应的节点设置为leader
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
//如果leader有改变的话就通知
if (!Objects.equals(leader, peer)) {
leader = peer;
// 投票完成事件
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
首先计算得票最大的票数和对应的ip是哪个,然后看看此时最大票数是否已经大于半数了,如果超过半数了,说明投票已经成功了,那么设置对应的peer为leader。最后比较一下之前的leader和现在的leader是否是同一个,如果不是就是leader在这次投票后有变更,那么进行LeaderElectFinishedEvent事件通知。
RaftListener监听这个事件:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof BaseRaftEvent) {
BaseRaftEvent raftEvent = (BaseRaftEvent) event;
RaftPeer local = raftEvent.getLocal();
String json = JacksonUtils.toJson(local);
Map map = JacksonUtils.toObj(json, HashMap.class);
Member self = memberManager.getSelf();
self.setExtendVal(GROUP, map);
memberManager.update(self);
}
}
就是更新serverList中对应自己那个节点的扩展属性extendInfo。
我们想一个问题,如果此时被选举出来的leader不是self自己(想想什么时候会有这种情况),那会怎么样呢?实际上这次投票就结束了,后面会把状态更新回来。而由于此时整个集群范围内还是没有leader,其他的节点同样也会因为leaderDueMs超时而再次发起选举投票,直到某个节点的选举结果leader是它self本身,那么他的心跳任务就会触发通知到别的节点上去。
首先我们要知道的是心跳任务必须是leader才会发送。
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
// -500毫秒
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
// 没有到期,退出
if (local.heartbeatDueMs > 0) {
return;
}
//重置心跳任期
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
该心跳了就发:
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
// 如果是单机或者本机不是leader,则退出
if (ApplicationUtils.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
//重置任期
local.resetLeaderDue();
// build data
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
//放入leader结点信息
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
ArrayNode array = JacksonUtils.createEmptyArrayNode();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
}
if (!switchDomain.isSendBeatOnly()) {
//还要带数据key
for (Datum datum : datums.values()) {
ObjectNode element = JacksonUtils.createEmptyJsonNode();
//加入不同类型的key
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());
array.add(element);
}
}
//带上数据的key
packet.replace("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
String content = JacksonUtils.toJson(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
compressedContent.length());
}
//发送给除了自己以外的
for (final String server : peers.allServersWithoutMySelf()) {
try {
// /v1/ns/raft/beat
final String url = buildUrl(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", response.getResponseBody(),
server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return 1;
}
//收到响应就添加RaftPeer,更新节点信息
peers.update(JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
return 0;
}
@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
首先必须是leader才会发送心跳,发送的数据包中放入了当前这个leader的信息。另外,如果SendBeatOnly=false,则会带上数据包一起发送。
然后向除了自己以外的其他节点发送/v1/ns/raft/beat消息,收到响应后就更新对应follower的peer信息:peers.update(JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class))
public RaftPeer update(RaftPeer peer) {
peers.put(peer.ip, peer);
return peer;
}
就是把新的放进去。
然后我们看一下/v1/ns/raft/beat消息的接收端,RaftController的beat方法:
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
return JacksonUtils.transferToJsonNode(peer);
}
raftCore.receivedBeat处理心跳,这个方法比较长,我们分两部分分析:
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
// 本地的
final RaftPeer local = peers.local();
// 远程的
final RaftPeer remote = new RaftPeer();
JsonNode peer = beat.get("peer");
remote.ip = peer.get("ip").asText();
remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
remote.term.set(peer.get("term").asLong());
remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
remote.leaderDueMs = peer.get("leaderDueMs").asLong();
remote.voteFor = peer.get("voteFor").asText();
//如果远程不是leader,无效
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
JacksonUtils.toJson(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
// 如果本地大于远程的,心跳信息过期了
if (local.term.get() > remote.term.get()) {
Loggers.RAFT
.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
throw new IllegalArgumentException(
"out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
}
// 本地不是跟随者,可能是候选人,所以将自己变为跟随者,选远程的leader
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
//数据key集合
final JsonNode beatDatums = beat.get("datums");
// 本地更新任期
local.resetLeaderDue();
local.resetHeartbeatDue();
//设置leader
peers.makeLeader(remote);
...
return local;
}
主要是local的数据更新,重置LeaderDue和HeartbeatDue,最后makeLeader设置leader:
public RaftPeer makeLeader(RaftPeer candidate) {
//leader改变了
if (!Objects.equals(leader, candidate)) {
//设置leader
leader = candidate;
//通知
ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local()));
Loggers.RAFT
.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()),
JacksonUtils.toJson(leader));
}
for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<>(1);
//如果存在其他以前的leader
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
try {
//请求以前过期的leader的信息
// /v1/ns/raft/peer
String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER);
HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(),
peer.ip);
// 更新为follower
peer.state = RaftPeer.State.FOLLOWER;
return 1;
}
//更新
update(JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class));
return 0;
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
//更新
return update(candidate);
}
如果leader变了,发起MakeLeaderEvent事件通知。
然后检查peers中是否有和这个待选的leader不一样的leader,如果有,则发起/v1/ns/raft/peer请求给它,然后把它的状态改成follower,再更新peers中它的数据。 – 前面提到的那种场景,投票后结果不是自己的情况,这里会把相关的状态调回去。
最后把待选的leader的信息页更新一下,至此这个新的leader在这个follower这边已经同步过来了。
我们再看一下/v1/ns/raft/peer接收方RaftController的getPeer方法:
@GetMapping("/peer")
public JsonNode getPeer(HttpServletRequest request, HttpServletResponse response) {
List<RaftPeer> peers = raftCore.getPeers();
RaftPeer peer = null;
for (RaftPeer peer1 : peers) {
if (StringUtils.equals(peer1.ip, NetUtils.localServer())) {
peer = peer1;
}
}
if (peer == null) {
peer = new RaftPeer();
peer.ip = NetUtils.localServer();
}
return JacksonUtils.transferToJsonNode(peer);
}
就是把这个节点当前的最新状态peer数据拿回来。
if (!switchDomain.isSendBeatOnly()) {
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
//放入所有的要更新服务信息的key
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums
List<String> batch = new ArrayList<>();
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT
.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
//遍历所有带来的数据的key
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.get("timestamp").asLong();
receivedKeysMap.put(datumKey, 1);
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
processedCount, beatDatums.size(), datums.size());
// update datum entry
String url = buildUrl(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
List<JsonNode> datumList = JacksonUtils
.toObj(response.getResponseBody(), new TypeReference<List<JsonNode>>() {
});
for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
try {
Datum oldDatum = getDatum(datumJson.get("key").asText());
if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
.get()) {
Loggers.RAFT
.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.get("key").asText(),
datumJson.get("timestamp").asLong(), oldDatum.timestamp);
continue;
}
if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
notifier.addTask(newDatum.key, ApplyAction.CHANGE);
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT
.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
e);
} finally {
OPERATE_LOCK.unlock();
}
}
TimeUnit.MILLISECONDS.sleep(200);
return 0;
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
蛮长的一段,其实就是根据leader
带来的数据key
,进行比对,要更新的就批量更新,收到后区分好Service
还是InstanceList
的进行更新,通知,并且更新任期状态。
其中有一个远程接口调用/v1/ns//raft/datum,实际上就是去这个新的leader去拿具体的数据,我们看一下它的实现RaftController的get方法:
@GetMapping("/datum")
public String get(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String keysString = WebUtils.required(request, "keys");
keysString = URLDecoder.decode(keysString, "UTF-8");
String[] keys = keysString.split(",");
List<Datum> datums = new ArrayList<Datum>();
// keys中所有的数据信息传回去,重新选举过程中有不一致的数据情况下
for (String key : keys) {
Datum datum = raftCore.getDatum(key);
datums.add(datum);
}
return JacksonUtils.toJson(datums);
}
至此整个Raft选举过程我们分析完了。中间还有很多细节,大家可以对照着一点点看。这里简单画个流程图:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112534817
内容来源于网络,如有侵权,请联系作者删除!