Nacos源码分析十二、NacosNamingService

x33g5p2x  于2021-12-20 转载在 其他  
字(14.7k)|赞(0)|评价(0)|浏览(642)

上篇说到Nacos对于服务注册和发现的支持是通过NamingService接口来实现的,对应的实现类是NacosNamingService,本篇详细介绍这个类。

先上一个结构图简单看一下这个类的结构,有个大体的了解:

看一下初始化代码:

private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    initServerAddr(properties);
    InitUtils.initWebRootContext();
    initCacheDir();
    initLogName(properties);

    // 事件调度
    this.eventDispatcher = new EventDispatcher();
    // 服务代理
    this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
    // 心跳
    this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
    // 主机服务
    this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
            isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}

主要就是针对上图中的各组件进行初始化。下面一个个分析

EventDispatcher

主要用来做事件发布的。里面维护了一个changedServices阻塞队列,初始化时启动了一个无限循环的独立线程:

public EventDispatcher() {

    // Notifier 任务
    this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
            thread.setDaemon(true);
            
            return thread;
        }
    });
    
    this.executor.execute(new Notifier());
}

这个Notifier线程的作用就是一直从阻塞队列changedServices中获取数据,拿到就发NamingEvent事件通知:

private class Notifier implements Runnable {
    
    @Override
    public void run() {
        while (!closed) {
            
            ServiceInfo serviceInfo = null;
            try {
                // 阻塞任务
                serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
            } catch (Exception ignore) {
            }
            
            if (serviceInfo == null) {
                continue;
            }
            
            try {
                // 监听器列表
                List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
                
                if (!CollectionUtils.isEmpty(listeners)) {
                    for (EventListener listener : listeners) {
                        List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                        // 发送监听事件
                        listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                                serviceInfo.getClusters(), hosts));
                    }
                }
                
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "
                        + serviceInfo.getClusters(), e);
            }
        }
    }
}

serviceChanged方法会往队列中添加元素:

public void serviceChanged(ServiceInfo serviceInfo) {
    if (serviceInfo == null) {
        return;
    }
    
    changedServices.add(serviceInfo);
}

也就是说只要有地方调用了serviceChanged方法,就会有NamingEvent事件发出来,我们定义的监听器就能监听到。

NamingProxy

有两个任务

  1. refreshSrvIfNeed 30秒一次刷新nacos服务列表
  2. securityProxy.login 登录认证,5秒一次。内部也有token续签的问题,当当前时间在token的时间窗口内时不会发起真正调用。 – 这部分之前在配置中心分析时已经看到了。

另外,NamingProxy代理了真正的http调用,比如实例服务的增删改查等操作。

BeatReactor

启动一个线程池,核心线程数是cpu核数的一半,当有心跳任务的时候就丢进去:

public BeatReactor(NamingProxy serverProxy, int threadCount) {
    this.serverProxy = serverProxy;
    this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.beat.sender");
            return thread;
        }
    });
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    // /nacos/v1/ns/instance/beat
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

我们在服务注册时看到了会调用这个addBeatInfo添加心跳任务。 我们看一下任务的内容:

class BeatTask implements Runnable {
    
    BeatInfo beatInfo;
    
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    
    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            // lightBeatEnabled 是否是轻量心跳,默认第一次发生是带body的;第一次心跳返回后,lightBeatEnabled字段会被更新成true,即后续的心跳不带body
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.get("clientBeatInterval").asLong();
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            // 如果返回周期字段,则下次发送心跳间隔由服务端决定
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            // 如果心跳发送后,返回资源不存在,则发起注册(注册后被删除的情况,这里心跳会自动重新注册。)
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                    JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
            
        }
        // 执行完成后再起心跳
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

心跳第一次发生时带body的,后面就不带了。如果返回资源不存在则会再注册一次服务。 另外需要注意,服务端对心跳周期的管理优先级高于客户端设置的。

HostReactor

主机服务处理。主要任务有:

  1. 维护服务缓存serviceInfoMap,进行服务变更通知
  2. 维护服务的本地缓存
  3. 提供UDP服务接收端,收取nacos服务端发来的UDP消息进行处理。
FailoverReactor

启动了3个任务

  1. SwitchRefresher 5秒钟一次的任务,检查本地是否存在容灾文件,如果存在并且内容是1,则读取本地缓存的服务文件:
@Override
public void run() {
    try {
        // 是否有文件
        File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
        if (!switchFile.exists()) {
            switchParams.put("failover-mode", "false");
            NAMING_LOGGER.debug("failover switch is not found, " + switchFile.getName());
            return;
        }

        // 上次覆盖时间
        long modified = switchFile.lastModified();
        
        if (lastModifiedMillis < modified) {
            lastModifiedMillis = modified;
            //获取文件内容
            String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                    Charset.defaultCharset().toString());
            if (!StringUtils.isEmpty(failover)) {
                String[] lines = failover.split(DiskCache.getLineSeparator());

                //根据内容进行容灾模式设置
                for (String line : lines) {
                    String line1 = line.trim();
                    if ("1".equals(line1)) {
                        switchParams.put("failover-mode", "true");
                        NAMING_LOGGER.info("failover-mode is on");
                        new FailoverFileReader().run();
                    } else if ("0".equals(line1)) {
                        switchParams.put("failover-mode", "false");
                        NAMING_LOGGER.info("failover-mode is off");
                    }
                }
            } else {
                switchParams.put("failover-mode", "false");
            }
        }
        
    } catch (Throwable e) {
        NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
    }
}
class FailoverFileReader implements Runnable {
    
    @Override
    public void run() {
        Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
        
        BufferedReader reader = null;
        try {
            
            File cacheDir = new File(failoverDir);
            if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                throw new IllegalStateException("failed to create cache dir: " + failoverDir);
            }
            
            File[] files = cacheDir.listFiles();
            if (files == null) {
                return;
            }
            
            for (File file : files) {
                if (!file.isFile()) {
                    continue;
                }
                
                if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
                    continue;
                }
                
                ServiceInfo dom = new ServiceInfo(file.getName());
                
                try {
                    String dataString = ConcurrentDiskUtil
                            .getFileContent(file, Charset.defaultCharset().toString());
                    reader = new BufferedReader(new StringReader(dataString));
                    
                    String json;
                    if ((json = reader.readLine()) != null) {
                        try {
                            dom = JacksonUtils.toObj(json, ServiceInfo.class);
                        } catch (Exception e) {
                            NAMING_LOGGER.error("[NA] error while parsing cached dom : " + json, e);
                        }
                    }
                    
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] failed to read cache for dom: " + file.getName(), e);
                } finally {
                    try {
                        if (reader != null) {
                            reader.close();
                        }
                    } catch (Exception e) {
                        //ignore
                    }
                }
                if (!CollectionUtils.isEmpty(dom.getHosts())) {
                    domMap.put(dom.getKey(), dom);
                }
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }
        
        if (domMap.size() > 0) {
            serviceMap = domMap;
        }
    }
}
  1. DiskFileWriter写本地磁盘的任务,主要是备份服务信息,每天执行一次
class DiskFileWriter extends TimerTask {
    
    @Override
    public void run() {
        Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
        for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
            ServiceInfo serviceInfo = entry.getValue();
            if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                    .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                    .equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00") || StringUtils
                    .equals(serviceInfo.getName(), "vipclient.properties") || StringUtils
                    .equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00")) {
                continue;
            }
            
            DiskCache.write(serviceInfo, failoverDir);
        }
    }
}
  1. 延迟10秒启动一个任务执行一次,看看有没有已经备份了,没有的话直接备份一次。
PushReceiver

启动一个任务用于接收UDP报文:

@Override
public void run() {
    while (!closed) {
        try {
            
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            
            udpSocket.receive(packet);
            
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
            
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            // 根据pushPacket.type要求作出响应
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                hostReactor.processServiceJson(pushPacket.data);
                
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                        + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
            }
            
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

当收到dom或者service时,说明有服务变更,此时做服务更新

public ServiceInfo processServiceJson(String json) {
    // 新老服务比较
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
        //empty or error push, just ignore
        return oldService;
    }
    
    boolean changed = false;
    
    if (oldService != null) {
        
        if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                    + serviceInfo.getLastRefTime());
        }
        
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        
        Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
        for (Instance host : oldService.getHosts()) {
            oldHostMap.put(host.toInetAddr(), host);
        }
        
        Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
        for (Instance host : serviceInfo.getHosts()) {
            newHostMap.put(host.toInetAddr(), host);
        }
        
        Set<Instance> modHosts = new HashSet<Instance>();
        Set<Instance> newHosts = new HashSet<Instance>();
        Set<Instance> remvHosts = new HashSet<Instance>();
        
        List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                newHostMap.entrySet());
        for (Map.Entry<String, Instance> entry : newServiceHosts) {
            Instance host = entry.getValue();
            String key = entry.getKey();
            if (oldHostMap.containsKey(key) && !StringUtils
                    .equals(host.toString(), oldHostMap.get(key).toString())) {
                modHosts.add(host);
                continue;
            }
            
            if (!oldHostMap.containsKey(key)) {
                newHosts.add(host);
            }
        }
        
        for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
            Instance host = entry.getValue();
            String key = entry.getKey();
            if (newHostMap.containsKey(key)) {
                continue;
            }
            
            if (!newHostMap.containsKey(key)) {
                remvHosts.add(host);
            }
            
        }
        
        if (newHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(newHosts));
        }
        
        if (remvHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(remvHosts));
        }
        
        if (modHosts.size() > 0) {
            changed = true;
            updateBeatInfo(modHosts);
            NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(modHosts));
        }
        
        serviceInfo.setJsonFromServer(json);
        
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            // 事件通知
            eventDispatcher.serviceChanged(serviceInfo);
            // 磁盘缓存
            DiskCache.write(serviceInfo, cacheDir);
        }
        
    } else {
        changed = true;
        NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                + JacksonUtils.toJson(serviceInfo.getHosts()));
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        eventDispatcher.serviceChanged(serviceInfo);
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }
    
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    
    if (changed) {
        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                + JacksonUtils.toJson(serviceInfo.getHosts()));
    }
    
    return serviceInfo;
}
UpdateTask

第一次获取某个指定服务时,会启动这个任务,默认每隔60秒更新一次服务。

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        //获取服务名相关的服务信息
        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

        //不存在就直接远程获取更新
        if (serviceObj == null) {
            //直接更新
            updateService(serviceName, clusters);
            return;
        }

        //还没更新过,要更新后比较
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            updateService(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            //已经更新过了,就直接把本地的推送上去,不比较差异
            // if serviceName already updated by push, we should not override it
            // since the push data may be different from pull through force push
            refreshOnly(serviceName, clusters);
        }
        
        lastRefTime = serviceObj.getLastRefTime();
        
        if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
            // abort the update task
            NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
            return;
        }
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        delayTime = serviceObj.getCacheMillis();
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

好了,到这里NacosNamingService的内容大体分析完了,下篇我们结合dubbo看服务发现部分。

相关文章