Nacos源码分析十六、服务端配置新增修改和删除

x33g5p2x  于2021-12-20 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(522)

Nacos服务端对于配置的新增修改和删除这部分,相对比较简单。

代码还是在ConfigController里,publishConfig方法提供新增和修改,deleteConfig方法提供删除操作

publicConfig

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
    
    final String srcIp = RequestUtil.getRemoteIp(request);
    final String requestIpApp = RequestUtil.getAppName(request);
    srcUser = RequestUtil.getSrcUserName(request);
    // check tenant
    ParamUtils.checkTenant(tenant);
    ParamUtils.checkParam(dataId, group, "datumId", content);
    ParamUtils.checkParam(tag);
    Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
    MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
    MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
    MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
    MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
    MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
    MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
    ParamUtils.checkParam(configAdvanceInfo);
    
    if (AggrWhitelist.isAggrDataId(dataId)) {
        LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
                dataId, group);
        throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
    }
    
    final Timestamp time = TimeUtils.getCurrentTime();
    String betaIps = request.getHeader("betaIps");
    ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
    configInfo.setType(type);
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    } else {
        // beta publish
        persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
        ConfigChangePublisher
                .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
    }
    ConfigTraceService
            .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIp(),
                    ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
    return true;
}
  1. 参数检查
  2. 构造ConfigInfo对象
  3. 存储数据
  4. 发布ConfigDataChangeEvent事件

我们看一下ConfigChangePublisher.notifyConfigChange方法:

public static void notifyConfigChange(ConfigDataChangeEvent event) {
    if (PropertyUtil.isEmbeddedStorage() && !ApplicationUtils.getStandaloneMode()) {
        return;
    }
    NotifyCenter.publishEvent(event);
}

如果开启了本地存储并且不是单机模式,则啥也不做。 也就是说这种情况下各节点各做各的。

否则通知中心发布ConfigDataChangeEvent事件。这个事件的发布主要是为了集群环境下各节点更新对应的配置MD5,以及发布LocalDataChangeEvent事件。

我们看一下ConfigDataChangeEvent事件是哪儿监听的:

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                // 集群所有服务
                Collection<Member> ipList = memberManager.allMembers();
                
                // In fact, any type of queue here can be
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                // 遍历添加一遍
                for (Member member : ipList) {
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                // 每个节点都发一下异步通知
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

可以看到这里维护了一个列表,里面存储NotifySingleTask任务,然后包装成AsyncTask丢到线程池中,这是一个异步任务框架,用于通知各种异步任务。里面实际上就是restTemplate.get调用。NotifySingleTask初始化时指定了调用地址

if (StringUtils.isBlank(tenant)) {
    this.url = MessageFormat.format(URL_PATTERN, target, ApplicationUtils.getContextPath(), dataId, group);
} else {
    this.url = MessageFormat
            .format(URL_PATTERN_TENANT, target, ApplicationUtils.getContextPath(), dataId, group, tenant);
}

/v1/cs/communication/dataChange

我们看一下CommunicationController的notifyConfigInfo方法:

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

然后是dumpService的dump方法:

public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
}

dumpTaskMgr添加了一个DumpTask任务

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

然后是父类的addTask:

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    lock.lock();
    try {
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            newTask.merge(existTask);
        }
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}

首先是老任务和新任务的合并,然后放到map缓存中。

然后是任务的执行:

@Override
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

这里根据taskKey获取到执行器,然后执行,如果执行失败丢回去再次执行。

此时的执行器是DumpChangeProcessor,我们看它的process方法:

@Override
public boolean process(AbstractDelayTask task) {
    List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5();
    for (ConfigInfoWrapper config : updateMd5List) {
        final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());
        ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified());
    }
    List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime);
    for (ConfigInfo configInfo : configDeleted) {
        if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant())
                == null) {
            ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());
        }
    }
    List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime);
    for (ConfigInfoWrapper cf : changeConfigs) {
        boolean result = ConfigCacheService
                .dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
        final String content = cf.getContent();
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
    }
    ConfigCacheService.reloadConfig();
    return true;
}

这里我把日志的代码删掉了,看着清晰一些。

首先从存储中取出所有的ConfigInfo,此时的ConfigInfo在通知前就已经更新了。遍历这些config,更新本机节点缓存的md5,ConfigCacheService.updateMd5:

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
    CacheItem cache = makeSure(groupKey);
    if (cache.md5 == null || !cache.md5.equals(md5)) {
        cache.md5 = md5;
        cache.lastModifiedTs = lastModifiedTs;
        NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
    }
}

可以看到,这里通知中心发布了LocalDataChangeEvent事件,然后LongPollingService监听到该事件通知客户端。

然后检查有没有删除的,有删除的执行 ConfigCacheService.remove:

public static boolean remove(String dataId, String group, String tenant) {
    final String groupKey = GroupKey2.getKey(dataId, group, tenant);
    final int lockResult = tryWriteLock(groupKey);
    
    // If data is non-existent.
    if (0 == lockResult) {
        DUMP_LOG.info("[remove-ok] {} not exist.", groupKey);
        return true;
    }
    
    // try to lock failed
    if (lockResult < 0) {
        DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey);
        return false;
    }
    
    try {
        if (!PropertyUtil.isDirectRead()) {
            DiskUtil.removeConfigInfo(dataId, group, tenant);
        }
        CACHE.remove(groupKey);
        NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        
        return true;
    } finally {
        releaseWriteLock(groupKey);
    }
}

就是取写锁,本地文件删除,缓存删除,LocalDataChangeEvent通知。
再往下是检查修改的,最后ConfigCacheService.reloadConfig();重新加载配置数据。

至此各个节点同步了数据,同时各节点都会发起LocalDataChangeEvent事件。

deleteConfig

@DeleteMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean deleteConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam("dataId") String dataId, //
        @RequestParam("group") String group, //
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) throws NacosException {
    // check tenant
    ParamUtils.checkTenant(tenant);
    ParamUtils.checkParam(dataId, group, "datumId", "rm");
    ParamUtils.checkParam(tag);
    String clientIp = RequestUtil.getRemoteIp(request);
    String srcUser = RequestUtil.getSrcUserName(request);
    if (StringUtils.isBlank(tag)) {
        persistService.removeConfigInfo(dataId, group, tenant, clientIp, srcUser);
    } else {
        persistService.removeConfigInfoTag(dataId, group, tenant, tag, clientIp, srcUser);
    }
    final Timestamp time = TimeUtils.getCurrentTime();
    ConfigTraceService.logPersistenceEvent(dataId, group, tenant, null, time.getTime(), clientIp,
            ConfigTraceService.PERSISTENCE_EVENT_REMOVE, null);
    ConfigChangePublisher
            .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
    return true;
}

和新增修改差不多,检查参数、删除存储中的数据、发布ConfigDataChangeEvent事件。

相关文章