文章24 | 阅读 13661 | 点赞0
先讨论DistroConsistencyServiceImpl AP最终一致性的实现。
我们知道当添加实例时会调用到put方法:
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
// 一致性协议的同步数据。这里同步数据是异步任务执行的,也就是说先返回客户端put成功再同步,弱一致性。 AP模型
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), ApplyAction.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
onPut是本机节点的Notifier通知change事件。 distroProtocol.sync是最终一致性的数据同步。
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
//创建临时数据
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//放进一个map里
dataStore.put(key, datum);
}
//没有监听器就返回
if (!listeners.containsKey(key)) {
return;
}
//有监听立即通知服务有改变
notifier.addTask(key, ApplyAction.CHANGE);
}
主要就是最后一行notifier.addTask(key, ApplyAction.CHANGE); 就是往tasks队列中添加数据:
public void addTask(String datumKey, ApplyAction action) {
// 如果services包含key,并且类型是change,则直接返回
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
// 如果是change,services添加一个空数据
services.put(datumKey, StringUtils.EMPTY);
}
// 数据对 加入队列中
tasks.offer(Pair.with(datumKey, action));
}
我们之前分析到,notifier中有一个无限循环的任务,会从tasks中一直take数据,当取到新的任务时,则会发起通知:
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
// 无限循环
for (; ; ) {
try {
// 消费队列
Pair<String, ApplyAction> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, ApplyAction> pair) {
try {
String datumKey = pair.getValue0();
ApplyAction action = pair.getValue1();
// 消费后移除key
services.remove(datumKey);
int count = 0;
// 没有监听直接退出
if (!listeners.containsKey(datumKey)) {
return;
}
// 遍历监听器,调动对应的监听方法
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
listener的onChange或者onDelete。 这里的listener是Service类:
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
然后是updateIPs方法:
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// PushService的serviceChange,发布ServiceChangeEvent事件
getPushService().serviceChanged(this);
...
}
更新信息,然后发布ServiceChangeEvent事件:
PushService的onApplicationEvent方法监听该事件,内容之前分析过了,就是UDP消息发给客户端。
AP一致性同步
public void sync(DistroKey distroKey, ApplyAction action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// holder持有临时同步延迟执行器引擎,引擎中有NacosTaskProcessor,临时一致性情况下实际上持有的是DistroDelayTaskProcessor,添加任务后最终由processor执行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
我们看一下这个DistroDelayTaskProcessor执行器的process方法:
@Override
public boolean process(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
// 发起临时一致性同步任务
if (ApplyAction.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
return true;
}
return false;
}
提交了一个DistroSyncChangeTask任务
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(ApplyAction.CHANGE);
// syncData执行数据同步,交由 NamingProxy.syncData执行 /nacos/v1/ns/distro/datum
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
// 同步失败重试,就是重新提交任务
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
提交消息给集群中其他节点。
我们看一下接收方DistroController的onSyncDatum方法:
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
if (dataMap.isEmpty()) {
Loggers.DISTRO.error("[onSync] receive empty entity!");
throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
}
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), null,
entry.getValue());
// 有临时一致性协议处理器处理请求数据,最终是调用到DistroConsistencyServiceImpl的onPut方法放入到缓存中,然后udp通知客户端更新
distroProtocol.onReceive(distroHttpData);
}
}
return ResponseEntity.ok("ok");
}
然后是distroProtocol.onReceive:
public void onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return;
}
dataProcessor.processData(distroData);
}
此时这个处理器是DistroConsistencyServiceImpl实现的,即最终一致性服务实现类,我们看一下processData:
@Override
public void processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
onPut(datum.key, datum.value);
}
可以看到,执行的是onPut。走前面分析的onPut逻辑。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112469144
内容来源于网络,如有侵权,请联系作者删除!