Nacos源码分析十五、服务端处理配置监听

x33g5p2x  于2021-12-20 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(469)

还记得NacosConfigService里的那个长轮询监听配置变更么?ClientWorker类里的LongPollingRunnable内部类。我们看一下它的run方法里的这句:

List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

这个方法就是调用服务端的/v1/cs/configs/listener接口,以监听可能出现变更的配置数据。往里跟一下代码:

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    
    // told server do not hang me up if new initializing cacheData added in
    //是初始化的会设置一个请求头标记
    // 初始化时不挂起
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    ...

这里关注往header里写了两个标记:

  1. Long-Pulling-Timeout 挂起的最大时长
  2. Long-Pulling-Timeout-No-Hangup 如果是初始化的请求,则不挂起。

猜一下干嘛用的?我们简单分析一下:

首先我们知道客户端的长轮询任务执行完成后立马又丢回了线程池

if (taskIdSet.contains(taskId)) {
    executorService.execute(this);
}

也就是说服务端会不停的收到监听请求。而实际上大部分时间上并没有配置变更,如果不停的发送消息,大部分的请求是无效的。而如果我们降低客户端发送请求的频率,这样就会导致配置变更时客户端会有一定的延迟。nacos的做法是,服务端通过异步servlet技术,挂起了没有配置变更的请求,当有配置变更时会通知这些挂起任务立即返回给客户端,而始终没有变更的话则按设置的挂起超时时间返回。这样既保证了客户端能够实时收到配置变更响应,又能够有效的降低无效请求。

接下来我们看看服务端是怎么做的。

先看一下ConfigController的listener接口:

@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    ...
    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

忽略不重要的代码,就是调用ConfigServletInner的doPollingConfig方法:

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    
    // Long polling.
    // 长轮询
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        // 返回成功
        return HttpServletResponse.SC_OK + "";
    }
    
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    
    // Befor 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    
    Loggers.AUTH.info("new content:" + newResult);
    
    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

LongPollingService.isSupportLongPolling(request) 就是判断header中是否包含Long-Pulling-Timeout,如果没有,说明不需要挂起,走下半段代码,直接检查配置变更后返回,就是同步返回。

如果header中有Long-Pulling-Timeout,进入

longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);

这行代码,进行异步挂起处理:

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    // 首先会看是否有不挂起的标志,也是客户端传来的,缓存配置初始化的时候就不挂起,否则会挂起。
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    
    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // 默认挂起时间29.5秒:
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    // 如果支持固定轮询的话
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        // 比对客户端发来的MD5,是否有改变,有改变的话就立即生成响应,
        // 否则就判断是否有不挂起标记,有的话就直接返回,因为没有改变,也不挂起,就返回了
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            // 如果设置了不挂起,则直接返回。
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        }
    }
    // 如果没有改变,另外没有设置不挂起,则进行异步任务

    String ip = RequestUtil.getRemoteIp(req);

    // 创建一个异步的上下文,然后创建ClientLongPolling任务,将上下文,超时等信息封装进去,然后调度ClientLongPolling任务
    // Must be called by http thread, or send response.
    // servlet3.0的异步消息
    final AsyncContext asyncContext = req.startAsync();
    
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L);

    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

我们画个简单的流程图:

也就是说通过客户端给过来的标识和服务端自身配置结合判断是否需要创建长轮询任务,如果不需要则直接返回用户。如果此时已经发现有配置变更了也不需要挂起而直接返回。

下面我们看看ClientLongPolling任务:

@Override
public void run() {
    // 又起了一个异步任务,延迟时间是传过来的timeout,即挂起时间
    // 延迟时间过后执行
    asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
        @Override
        public void run() {
            ...
        }
        
    }, timeoutTime, TimeUnit.MILLISECONDS);

    // 放入监听队列里
    allSubs.add(this);
}

这里比较有意思的,又起了一个任务,延迟时间就是之前设置的超时时间。实际上这个任务能执行到的情况就是这段时间没有配置变更到最后的兜底操作。而这段时间如果有配置变更,会通过调用asyncTimeoutFuture.cancel来取消这个任务。

另外一个allSubs是一个队列,这里把自身添加进队列。

我们看一下超时后的任务操作:

@Override
public void run() {
    try {
        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
        
        // Delete subsciber's relations.
        // 移除订阅
        allSubs.remove(ClientLongPolling.this);
        
        if (isFixedPolling()) {
            LogUtil.CLIENT_LOG
                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                            "polling", clientMd5Map.size(), probeRequestSize);
            // 如果是固定的,生成响应的时候会去比对MD5
            List<String> changedGroups = MD5Util
                    .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                            (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
            if (changedGroups.size() > 0) {
                sendResponse(changedGroups);
            } else {
                sendResponse(null);
            }
        } else {
            // 不是固定轮询,到点就移除。asyncContext.complete发送http响应
            // 如果不是固定轮询的,就直接返回了,因为在这个任务之前已经判断过没有改变,才会挂起,为了有改变的时候直接响应
            // 挂起到点说明这段时间没有改变,则直接发送http响应。 如果这段时间有改变,则由监听器调用sendResponse直接返回消息。调用时会删除当前任务。
            LogUtil.CLIENT_LOG
                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                            "polling", clientMd5Map.size(), probeRequestSize);
            sendResponse(null);
        }
    } catch (Throwable t) {
        LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
    }
    
}

首先既然已经超时了,也就不需要订阅变更事件了,直接移除。

如果是固定轮询事件的,也就是说之前即使有变更事件也不会处理,这里做最终处理。 就是看看有没有变更的groups,然后发送响应消息。

如果不是固定轮询的,说明到最后都没有变更事件,那么直接返回空。

看一下sendResponse方法:

void sendResponse(List<String> changedGroups) {
    
    // Cancel time out task.
    // 移除轮询任务
    if (null != asyncTimeoutFuture) {
        asyncTimeoutFuture.cancel(false);
    }
    generateResponse(changedGroups);
}

既然要返回了,那么如果还有future就取消掉,然后是generateResponse:

void generateResponse(List<String> changedGroups) {
    if (null == changedGroups) {
        
        // Tell web container to send http response.
        asyncContext.complete();
        return;
    }
    
    HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
    
    try {
        final String respString = MD5Util.compareMd5ResultString(changedGroups);
        
        // Disable cache.
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        response.getWriter().println(respString);
        asyncContext.complete();
    } catch (Exception ex) {
        PULL_LOG.error(ex.toString(), ex);
        asyncContext.complete();
    }
}

就是异步上下文的complete。

那么如果在超时时间到达之前有配置变更会怎么样呢?我们首先看一下这个LongPollingService的初始化方法:

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe LocalDataChangeEvent.

    // 这里 发布-订阅模式
    NotifyCenter.registerSubscriber(new Subscriber() {

        // 这里监听配置信息改变事件
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) {
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
    
}

首先allSubs队列的初始化,这是一个LinkedQueue。

然后往通知中心注册了一个订阅LocalDataChangeEvent的监听器,当有配置变更时,通知中心会发出LocalDataChangeEvent事件,这里接收事件。

如果是固定轮询模式,则啥也不做,交由兜底任务asyncTimeoutFuture处理。

如果不是,则提交一个DataChangeTask任务:

@Override
public void run() {
    try {
        ConfigCacheService.getContentBetaMd5(groupKey);
        //遍历所有的订阅ClientLongPolling
        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
            ClientLongPolling clientSub = iter.next();
            // 确定是相同group的客户端
            if (clientSub.clientMd5Map.containsKey(groupKey)) {
                // If published tag is not in the beta list, then it skipped.
                // 如果beta发布且不在beta列表直接跳过
                if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                    continue;
                }
                
                // If published tag is not in the tag list, then it skipped.
                // 如果tag发布且不在tag列表直接跳过
                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                    continue;
                }
                
                getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                // 删除订阅关系
                iter.remove(); // Delete subscribers' relationships.
                LogUtil.CLIENT_LOG
                        .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                RequestUtil
                                        .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                // 发送响应消息,立即返回响应
                clientSub.sendResponse(Arrays.asList(groupKey));
            }
        }
    } catch (Throwable t) {
        LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
    }
}

遍历队列,判断是否是相同group的客户端,如果是,从队列中移除,然后发送响应消息。

sendResponse时会取消掉对应的兜底任务asyncTimeoutFuture。

画个图:

总结一下

  1. 首先根据客户端给的header和自身的配置来确定是否需要进行挂起
  2. 所谓挂起就是起了个ClientLongPolling任务。 这个任务又起了个asyncTimeoutFuture来做超时兜底。然后将自身加入到监听队列中来监听过程变化。
  3. 服务端启动时注册了一个监听LocalDataChangeEvent事件的监听器,如果有配置变更则会启动DataChangeTask任务
  4. DataChangeTask任务遍历监听队列,从中找到对应的客户端,取消对应的asyncTimeoutFuture任务同时直接返回response响应。

相关文章