Nacos源码分析三、配置中心(2)

x33g5p2x  于2021-12-20 转载在 其他  
字(15.1k)|赞(0)|评价(0)|浏览(654)

从NacosConfigService的构造方法里

this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);

这行代码开始,看下ClientWorker的初始化做了什么:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    // http代理
    this.agent = agent;
    // 过滤器
    this.configFilterChainManager = configFilterChainManager;
    
    // Initialize the timeout parameter
    // 初始化配置
    init(properties);

    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    // cpu核数的线程,用来做长轮询的,每次检查配置,如果LongPollingRunnable任务的配置缓存超过一定数量,默认3000个,就要去开启一个新任务去检查配置
    // Runtime.getRuntime().availableProcessors()获取cpu核数
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });

    // 10毫秒的任务,检查配置信息 LongPollingRunnable
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

这里定义了两个线程池,executorService这里只定义了,还没放入线程,用来做长轮询的,核心线程数是CPU核数;executor每10毫秒执行一次checkConfigInfo方法。

我们看一下checkConfigInfo这个方法:

public void checkConfigInfo() {
    // Dispatch taskes.
    // 监听的数量
    int listenerSize = cacheMap.get().size();
    // Round up the longingTaskCount.
    // 监听数量/3000 向上取整
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            taskIdSet.add(i);
            // 循环3000次,建一个新的
            // LongPollingRunnable 长链接监听
            // 每个LongPollingRunnable默认可以负责3000个监听器的轮询
            executorService.execute(new LongPollingRunnable(i));
        }
    } else if (longingTaskCount < currentLongingTaskCount) {
        for (int i = longingTaskCount; i < (int) currentLongingTaskCount; i++) {
            taskIdSet.remove(i);
        }
    }
    currentLongingTaskCount = longingTaskCount;
}

ParamUtil.getPerTaskConfigSize()这个默认是3000。

这里的逻辑是这样的,根据监听器的数量建立长轮询任务,每3000个监听建一个任务并放入到executorService里。

监听器在cacheMap里,后面我们看addListener方法时会看到写入这个缓存的操作。

我们看一下LongPollingRunnable这个:

@Override
public void run() {
    
    List<CacheData> cacheDatas = new ArrayList<CacheData>();

    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            //属于当前长轮询任务的
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    checkLocalConfig(cacheData);
                    //用本地配置
                    if (cacheData.isUseLocalConfigInfo()) {
                        //有改变的话会通知
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        //获取有变化的配置列表dataid+group,访问的url是/listener
        // check server config
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        if (!CollectionUtils.isEmpty(changedGroupKeys)) {
            LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
        }

        //轮询有配置改变的,然后去获取内容
        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                //有更新的就获取一次配置
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                //设置配置内容
                cache.setContent(ct[0]);
                if (null != ct[1]) {
                    //设置配置类型
                    cache.setType(ct[1]);
                }
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), dataId, group, tenant, cache.getMd5(),
                        ContentUtils.truncateContent(ct[0]), ct[1]);
            } catch (NacosException ioe) {
                String message = String
                        .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        //不是初始化中的或者初始化集合里存在的
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                //检查是否有变化,有变化就通知
                cacheData.checkListenerMd5();
                //请求过了后就设置为不在初始化中,这样就会被挂起,如果服务器配置有更新,就会立即返回,这样就可以实现动态配置更新,又不会太多的空轮询消耗
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();
        
        if (taskIdSet.contains(taskId)) {
            executorService.execute(this);
        }
        
    } catch (Throwable e) {
        
        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}
  1. 当前taskId下的所有cacheData都取出来
  2. 本地配置验证,有需要发送监听通知checkListenerMd5
  3. checkUpdateDataIds远程调用服务端检查是否有变更的
  4. 有的话getServerConfig获取新的配置,有需要的话发送监听通知checkListenerMd5
  5. 将当前线程再放回线程池中executorService.execute(this);

所谓的长轮询,简单来理解就是这个LongPollingRunnable线程会一直执行,每次都会去服务端获取和当前taskId相关的配置变更列表,如发现变更就取新的来进行更新。

我们看一下checkUpdateDataIds方法:

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
    //把配置信息都连起来,一次请求
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) {
        //不用本地的
        if (!cacheData.isUseLocalConfigInfo()) {
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {
                // It updates when cacheData occours in cacheMap by first time.
                // cacheData 首次出现在cacheMap中&首次check更新
                inInitializingCacheList
                        .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    //是否是初始化的获取标记
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

然后是checkUpdateConfigStr方法:

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    
    // told server do not hang me up if new initializing cacheData added in
    //是初始化的会设置一个请求头标记
    // 初始化时不挂起
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // 增加超时时间,防止被挂起,只有初始化的时候isInitializingCacheList=true不会挂起,
        // 应该是服务器看了请求头Long-Pulling-Timeout-No-Hangup才不会挂起
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        
        if (result.ok()) {
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData());
        } else {
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

可以看到就是/v1/cs/configs/listener接口。响应实际上是dataId和group集合,也就是配置的坐标资源列表。

回来看getServerConfig方法:

public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
        throws NacosException {
    String[] ct = new String[2];
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }
    
    HttpRestResult<String> result = null;
    try {
        Map<String, String> params = new HashMap<String, String>(3);
        if (StringUtils.isBlank(tenant)) {
            params.put("dataId", dataId);
            params.put("group", group);
        } else {
            params.put("dataId", dataId);
            params.put("group", group);
            params.put("tenant", tenant);
        }
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (Exception ex) {
        String message = String
                .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",
                        agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ex);
        throw new NacosException(NacosException.SERVER_ERROR, ex);
    }
    
    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
            ct[0] = result.getData();
            if (result.getHeader().getValue(CONFIG_TYPE) != null) {
                ct[1] = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                ct[1] = ConfigType.TEXT.getType();
            }
            return ct;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return ct;
        case HttpURLConnection.HTTP_CONFLICT: {
            LOGGER.error(
                    "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                            + "tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                    "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(),
                    dataId, group, tenant);
            throw new NacosException(result.getCode(), result.getMessage());
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(),
                    dataId, group, tenant, result.getCode());
            throw new NacosException(result.getCode(),
                    "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
                            + tenant);
        }
    }
}

/v1/cs/configs接口的调用。取相应最新的配置数据。

最后是checkListenerMd5方法:

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        //有改变的话就通知
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}

MD5校验,有变更就通知

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
        final String md5, final ManagerListenerWrap listenerWrap) {

    /**
     * 创建了一个任务,封装好信息,调用监听器的receiveConfigInfo方法接受数据处理。然后修改内容和MD5。
     * 这里他设置了一下类加载器,包装和监听器的类加载器一样,可能跟SPI反射调用相关。
     */
    final Listener listener = listenerWrap.listener;
    
    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                Thread.currentThread().setContextClassLoader(appClassLoader);
                
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                
                // compare lastContent and content
                if (listener instanceof AbstractConfigChangeListener) {
                    Map data = ConfigChangeHandler.getInstance()
                            .parseChangeData(listenerWrap.lastContent, content, type);
                    ConfigChangeEvent event = new ConfigChangeEvent(data);
                    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                    listenerWrap.lastContent = content;
                }
                
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                        listener);
            } catch (NacosException ex) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                        name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                        group, md5, listener, t.getCause());
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };
    
    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                group, md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
            name, (finishNotify - startNotify), dataId, group, md5, listener);
}

如果监听器有线程池,则把通知任务丢线程池中,如果没有直接job.run.

job里listener.receiveConfigInfo(contentTmp);调用了监听器的receiveConfigInfo方法,也就是我们addListener时定义的实现。

回来看addListener方法:

@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
    worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
        throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
    CacheData cache = getCache(dataId, group, tenant);
    if (null != cache) {
        return cache;
    }
    String key = GroupKey.getKeyTenant(dataId, group, tenant);
    synchronized (cacheMap) {
        CacheData cacheFromMap = getCache(dataId, group, tenant);
        // multiple listeners on the same dataid+group and race condition,so
        // double check again
        // other listener thread beat me to set to cacheMap
        if (null != cacheFromMap) {
            cache = cacheFromMap;
            // reset so that server not hang this check
            cache.setInitializing(true);
        } else {
            cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
            int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
            cache.setTaskId(taskId);
            // fix issue # 1317
            if (enableRemoteSyncConfig) {
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                cache.setContent(ct[0]);
            }
        }
        
        Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
        copy.put(key, cache);
        cacheMap.set(copy);
    }
    LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
    
    MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
    
    return cache;
}

这里就是cacheMap的添加过程。

public void addListener(Listener listener) {
    if (null == listener) {
        throw new IllegalArgumentException("listener is null");
    }
    /**
     * 根据传入的类型,调用不同的ManagerListenerWrap构造函数
     */
    ManagerListenerWrap wrap =
            (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
                    : new ManagerListenerWrap(listener, md5);

    // listeners 是CopyOnWriteArrayList
    // 可以在写的时候提高性能,写的时候是复制一份去改的,原来的数据也能读,
    // 但是是旧的值,不过没关系,一般只修改一个元素,不影响到其他元素,其他元素照样可以读,旧的更新的是一样的数据
    if (listeners.addIfAbsent(wrap)) {
        LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                listeners.size());
    }
}

我们一下listeners的定义

private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;

是一个CopyOnWriteArrayList,这样保证了在添加监听的时候不会引起建立长轮询任务的地方出问题。因为listeners在写的时候是copy了一个副本,读的地方还是使用原来的list。

总结一下:

  1. ClientWorker的创建过程 – 主要是构建长轮询任务
  2. 长轮询任务如何建立的 – 3000个监听一组启动一个长轮询任务。任务跑完会写回到线程池中继续再次使用。
  3. 配置变更如何通知的 – 先去服务端拿到有变更的配置坐标,然后再发起调用获取配置内容。

另外ConfigService对配置的增删改查就是调用对应的服务端接口,这个就没什么好分析的了,要注意一个本地缓存的问题,优先使用本地缓存,是缓存文件。

LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);

下面的问题是当监听器收到变更通知时如何更新到应用服务层的?主要是Spring-cloud相关的内容了,后面篇幅开始分析这部分。

相关文章