Nacos源码分析十三、客户端服务发现

x33g5p2x  于2021-12-20 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(566)

前面分析了NacosNamingService的基本工作内容,下面我们结合dubbo看一下服务发现部分内容。

先看一下dubbo中RegistryProtocol的refer方法:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    ...
    return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(subscribeUrl);
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    // refer过程主要是这里的服务订阅
    directory.subscribe(toSubscribeUrl(subscribeUrl));

    Invoker<T> invoker = cluster.join(directory);
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        return invoker;
    }

    RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, registryInvokerWrapper);
    }
    return registryInvokerWrapper;
}

主要是服务目录的订阅directory.subscribe:

public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    // 这里是nacosRegistry
    registry.subscribe(url, this);
}

之前讨论服务注册时已经了解到,这里的registry是NacosRegistry,我们看一下它的doSubscribe方法:

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    Set<String> serviceNames = getServiceNames(url, listener);
    doSubscribe(url, listener, serviceNames);
}

根据url获取服务名称,然后继续:

private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
    execute(namingService -> {
        List<Instance> instances = new LinkedList();
        for (String serviceName : serviceNames) {
           // 获取服务实例
            instances.addAll(namingService.getAllInstances(serviceName));
            subscribeEventListener(serviceName, url, listener);
        }
        notifySubscriber(url, listener, instances);
    });
}

namingService.getAllInstances(serviceName)获取和当前服务名称相关的服务实例:

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    if (subscribe) {
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

这里subscribe是true,我们看hostReactor.getServiceInfo:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    
    // 先从缓存中获取
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        updatingMap.put(serviceName, new Object());
        // 没取到直接远程调用
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                   // 等一会
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }

    // 添加UpdateTask任务
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}

先从缓存中获取服务信息,如果没有直接远程调用,调用完成后会更新缓存,同时调用eventDispatcher.serviceChanged发起事件通知。后面有注册监听器来监听这个事件。

最后添加了一个UpdateTask任务:

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }
    
    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        // 添加服务更新任务
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

这个任务之前分析过,会定时请求得到最新的服务信息,如果有变更则发起eventDispatcher.serviceChanged通知。

然后回来再看一下事件监听的订阅NacosRegistry的subscribeEventListener方法:

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
        throws NacosException {
    EventListener eventListener = event -> {
        if (event instanceof NamingEvent) {
            NamingEvent e = (NamingEvent) event;
            notifySubscriber(url, listener, e.getInstances());
        }
    };
    // nacos的名字服务订阅。 参数是服务名称和事件监听器
    namingService.subscribe(serviceName, eventListener);
}

构造EventListener,然后namingService.subscribe订阅:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
    // 添加监听
    eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
        StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
}

往eventDispatcher中添加监听器:

public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

    NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
    List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
    observers.add(listener);

    observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
    if (observers != null) {
        observers.add(listener);
    }

    serviceChanged(serviceInfo);
}

实际上就是observerMap把这个监听器加进去。然后Notifier这个无限循环的任务会根据服务信息从observerMap中取出来对应的监听器,然后发出NamingEvent事件。事件发出后就回到了

EventListener eventListener = event -> {
        if (event instanceof NamingEvent) {
            NamingEvent e = (NamingEvent) event;
            // 触发服务目录的notify通知
            notifySubscriber(url, listener, e.getInstances());
        }
    };

这段代码。再往里就是这个listener的notify了,这就回到了dubbo服务目录的更新通知。

总结一下

这里有dubbo的监听,也有nacos的事件监听,搞清楚这几个的关系就可以搞清楚流程了。代码没有全部贴出来,可对照源码详细查看。

至此nacos客户端相关的内容已经差不多了。后面开始学习服务端部分的一些内容。

相关文章