文章24 | 阅读 13579 | 点赞0
Nacos服务端对于配置的新增修改和删除这部分,相对比较简单。
代码还是在ConfigController里,publishConfig方法提供新增和修改,deleteConfig方法提供删除操作
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request);
final String requestIpApp = RequestUtil.getAppName(request);
srcUser = RequestUtil.getSrcUserName(request);
// check tenant
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(dataId)) {
LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
dataId, group);
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else {
// beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIp(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
我们看一下ConfigChangePublisher.notifyConfigChange方法:
public static void notifyConfigChange(ConfigDataChangeEvent event) {
if (PropertyUtil.isEmbeddedStorage() && !ApplicationUtils.getStandaloneMode()) {
return;
}
NotifyCenter.publishEvent(event);
}
如果开启了本地存储并且不是单机模式,则啥也不做。 也就是说这种情况下各节点各做各的。
否则通知中心发布ConfigDataChangeEvent事件。这个事件的发布主要是为了集群环境下各节点更新对应的配置MD5,以及发布LocalDataChangeEvent事件。
我们看一下ConfigDataChangeEvent事件是哪儿监听的:
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
// 集群所有服务
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
// 遍历添加一遍
for (Member member : ipList) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
// 每个节点都发一下异步通知
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
可以看到这里维护了一个列表,里面存储NotifySingleTask任务,然后包装成AsyncTask丢到线程池中,这是一个异步任务框架,用于通知各种异步任务。里面实际上就是restTemplate.get调用。NotifySingleTask初始化时指定了调用地址
if (StringUtils.isBlank(tenant)) {
this.url = MessageFormat.format(URL_PATTERN, target, ApplicationUtils.getContextPath(), dataId, group);
} else {
this.url = MessageFormat
.format(URL_PATTERN_TENANT, target, ApplicationUtils.getContextPath(), dataId, group, tenant);
}
/v1/cs/communication/dataChange
我们看一下CommunicationController的notifyConfigInfo方法:
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
然后是dumpService的dump方法:
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
}
dumpTaskMgr添加了一个DumpTask任务
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
super.addTask(key, newTask);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
然后是父类的addTask:
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
首先是老任务和新任务的合并,然后放到map缓存中。
然后是任务的执行:
@Override
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
这里根据taskKey获取到执行器,然后执行,如果执行失败丢回去再次执行。
此时的执行器是DumpChangeProcessor,我们看它的process方法:
@Override
public boolean process(AbstractDelayTask task) {
List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5();
for (ConfigInfoWrapper config : updateMd5List) {
final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());
ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified());
}
List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime);
for (ConfigInfo configInfo : configDeleted) {
if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant())
== null) {
ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());
}
}
List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime);
for (ConfigInfoWrapper cf : changeConfigs) {
boolean result = ConfigCacheService
.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
}
ConfigCacheService.reloadConfig();
return true;
}
这里我把日志的代码删掉了,看着清晰一些。
首先从存储中取出所有的ConfigInfo,此时的ConfigInfo在通知前就已经更新了。遍历这些config,更新本机节点缓存的md5,ConfigCacheService.updateMd5:
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
可以看到,这里通知中心发布了LocalDataChangeEvent事件,然后LongPollingService监听到该事件通知客户端。
然后检查有没有删除的,有删除的执行 ConfigCacheService.remove:
public static boolean remove(String dataId, String group, String tenant) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final int lockResult = tryWriteLock(groupKey);
// If data is non-existent.
if (0 == lockResult) {
DUMP_LOG.info("[remove-ok] {} not exist.", groupKey);
return true;
}
// try to lock failed
if (lockResult < 0) {
DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey);
return false;
}
try {
if (!PropertyUtil.isDirectRead()) {
DiskUtil.removeConfigInfo(dataId, group, tenant);
}
CACHE.remove(groupKey);
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
return true;
} finally {
releaseWriteLock(groupKey);
}
}
就是取写锁,本地文件删除,缓存删除,LocalDataChangeEvent通知。
再往下是检查修改的,最后ConfigCacheService.reloadConfig();重新加载配置数据。
至此各个节点同步了数据,同时各节点都会发起LocalDataChangeEvent事件。
@DeleteMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean deleteConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, //
@RequestParam("group") String group, //
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) throws NacosException {
// check tenant
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", "rm");
ParamUtils.checkParam(tag);
String clientIp = RequestUtil.getRemoteIp(request);
String srcUser = RequestUtil.getSrcUserName(request);
if (StringUtils.isBlank(tag)) {
persistService.removeConfigInfo(dataId, group, tenant, clientIp, srcUser);
} else {
persistService.removeConfigInfoTag(dataId, group, tenant, tag, clientIp, srcUser);
}
final Timestamp time = TimeUtils.getCurrentTime();
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, null, time.getTime(), clientIp,
ConfigTraceService.PERSISTENCE_EVENT_REMOVE, null);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
return true;
}
和新增修改差不多,检查参数、删除存储中的数据、发布ConfigDataChangeEvent事件。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112366177
内容来源于网络,如有侵权,请联系作者删除!