文章24 | 阅读 13871 | 点赞0
前面分析了NacosNamingService的基本工作内容,下面我们结合dubbo看一下服务发现部分内容。
先看一下dubbo中RegistryProtocol的refer方法:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
...
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// refer过程主要是这里的服务订阅
directory.subscribe(toSubscribeUrl(subscribeUrl));
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
主要是服务目录的订阅directory.subscribe:
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 这里是nacosRegistry
registry.subscribe(url, this);
}
之前讨论服务注册时已经了解到,这里的registry是NacosRegistry,我们看一下它的doSubscribe方法:
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
Set<String> serviceNames = getServiceNames(url, listener);
doSubscribe(url, listener, serviceNames);
}
根据url获取服务名称,然后继续:
private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
execute(namingService -> {
List<Instance> instances = new LinkedList();
for (String serviceName : serviceNames) {
// 获取服务实例
instances.addAll(namingService.getAllInstances(serviceName));
subscribeEventListener(serviceName, url, listener);
}
notifySubscriber(url, listener, instances);
});
}
namingService.getAllInstances(serviceName)获取和当前服务名称相关的服务实例:
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
这里subscribe是true,我们看hostReactor.getServiceInfo:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 先从缓存中获取
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 没取到直接远程调用
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
// 等一会
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 添加UpdateTask任务
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
先从缓存中获取服务信息,如果没有直接远程调用,调用完成后会更新缓存,同时调用eventDispatcher.serviceChanged发起事件通知。后面有注册监听器来监听这个事件。
最后添加了一个UpdateTask任务:
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// 添加服务更新任务
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
这个任务之前分析过,会定时请求得到最新的服务信息,如果有变更则发起eventDispatcher.serviceChanged通知。
然后回来再看一下事件监听的订阅NacosRegistry的subscribeEventListener方法:
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
};
// nacos的名字服务订阅。 参数是服务名称和事件监听器
namingService.subscribe(serviceName, eventListener);
}
构造EventListener,然后namingService.subscribe订阅:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
// 添加监听
eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
}
往eventDispatcher中添加监听器:
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
serviceChanged(serviceInfo);
}
实际上就是observerMap把这个监听器加进去。然后Notifier这个无限循环的任务会根据服务信息从observerMap中取出来对应的监听器,然后发出NamingEvent事件。事件发出后就回到了
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
// 触发服务目录的notify通知
notifySubscriber(url, listener, e.getInstances());
}
};
这段代码。再往里就是这个listener的notify了,这就回到了dubbo服务目录的更新通知。
这里有dubbo的监听,也有nacos的事件监听,搞清楚这几个的关系就可以搞清楚流程了。代码没有全部贴出来,可对照源码详细查看。
至此nacos客户端相关的内容已经差不多了。后面开始学习服务端部分的一些内容。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112345151
内容来源于网络,如有侵权,请联系作者删除!