zookeeper源码分析之Watcher和DataTree

x33g5p2x  于2021-12-20 转载在 其他  
字(16.3k)|赞(0)|评价(0)|浏览(375)

一、Watcher

Watcher监听器,zookeeper数据节点实现监听机制,监听数据和节点变化事件。Watcher接口中只有一个process(WatchedEvent event)方法,包含两个内部类,Event(具体事件)和WatcherType(监听类型)。

public interface Watcher {

    public interface Event {

        public enum KeeperState {
            
            Unknown (-1),
            Disconnected (0),
            NoSyncConnected (1),
            SyncConnected (3),
            AuthFailed (4),
            ConnectedReadOnly (5),
            SaslAuthenticated(6),
            Expired (-112),
            Closed (7);

            private final int intValue;  
	    
            KeeperState(int intValue) {
                this.intValue = intValue;
            }

        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4),
            DataWatchRemoved (5),
            ChildWatchRemoved (6);

            private final int intValue;     
                                           
            EventType(int intValue) {
                this.intValue = intValue;
            }
        }
    }

    public enum WatcherType {

        Children(1), Data(2), Any(3);

        private final int intValue;

        private WatcherType(int intValue) {
            this.intValue = intValue;
        }
    }

    abstract public void process(WatchedEvent event);
}

二、WatchManager

WatchManager监听器管理器,实现IWatchManager接口,提供对监听器的注册,移除,触发监听等方法。它持有watchTable和watch2Paths两个内存缓存,可以通过节点path获取所有watcher触发监听事件,也可以通过Watcher找到它监听的所有节点路径。值得注意的是,每次触发watch监听是从缓存中移除的,所以下次触发节点监听事件,需要重新注册。

public class WatchManager implements IWatchManager {

    private final Map<String, Set<Watcher>> watchTable =
        new HashMap<String, Set<Watcher>>();

    private final Map<Watcher, Set<String>> watch2Paths =
        new HashMap<Watcher, Set<String>>();

    boolean isDeadWatcher(Watcher watcher) {
        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
    }

   // 添加watcher
    public synchronized boolean addWatch(String path, Watcher watcher) {
        if (isDeadWatcher(watcher)) {
            LOG.debug("Ignoring addWatch with closed cnxn");
            return false;
        }

        Set<Watcher> list = watchTable.get(path);
        if (list == null) {
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);

        Set<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        return paths.add(path);
    }

    // 删除watcher
    public synchronized void removeWatcher(Watcher watcher) {
        Set<String> paths = watch2Paths.remove(watcher);
        if (paths == null) {
            return;
        }
        for (String p : paths) {
            Set<Watcher> list = watchTable.get(p);
            if (list != null) {
                list.remove(watcher);
                if (list.isEmpty()) {
                    watchTable.remove(p);
                }
            }
        }
    }

    // 触发watcher事件
    public WatcherOrBitSet triggerWatch(String path, EventType type) {
        return triggerWatch(path, type, null);
    }

    // 触发watcher事件,忽略已经触发的
    public WatcherOrBitSet triggerWatch(
            String path, EventType type, WatcherOrBitSet supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        Set<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                Set<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return new WatcherOrBitSet(watchers);
    }

    // 是否有watcher
    public synchronized boolean containsWatcher(String path, Watcher watcher) {
        Set<String> paths = watch2Paths.get(watcher);
        if (paths == null || !paths.contains(path)) {
            return false;
        }
        return true;
    }

    // 转换watcher至<SessionId,Set<Path>>结构
    public synchronized WatchesReport getWatches() {
        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
        for (Entry<Watcher, Set<String>> e: watch2Paths.entrySet()) {
            Long id = ((ServerCnxn) e.getKey()).getSessionId();
            Set<String> paths = new HashSet<String>(e.getValue());
            id2paths.put(id, paths);
        }
        return new WatchesReport(id2paths);
    }

    // 转换watcher至<Path,Set<SessionId>>结构
    public synchronized WatchesPathReport getWatchesByPath() {
        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
        for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
            Set<Long> ids = new HashSet<Long>(e.getValue().size());
            path2ids.put(e.getKey(), ids);
            for (Watcher watcher : e.getValue()) {
                ids.add(((ServerCnxn) watcher).getSessionId());
            }
        }
        return new WatchesPathReport(path2ids);
    }

    // 转换watcher至WatchesSummary结构
    public synchronized WatchesSummary getWatchesSummary() {
        int totalWatches = 0;
        for (Set<String> paths : watch2Paths.values()) {
            totalWatches += paths.size();
        }
        return new WatchesSummary (watch2Paths.size(), watchTable.size(),
                                   totalWatches);
    }

    @Override
    public void shutdown() { /* do nothing */ }
}

三、DataTree

DataTree是zookeeper维护的向一个树形结构的数据。每一个节点上可以有很多子节点DataNode。子节点path为父节点路径加上‘/’ 分隔符加上子节点name。DataTree持有一个hashMap对象,用于节点获取,root根节点主要用户数据持久化。DataTree提供创建节点,删除节点,设置节点数据,获取节点数据,数据、子节点监听等方法。

public class DataTree {

    //这个hashtable对datanodes的快速查找。
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();
    // 数据监听管理器
    private IWatchManager dataWatches;
    // 子节点监听管理器
    private IWatchManager childWatches;
    // 根节点
    private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted());
    // 构造方法,同时创建'/','/zookeeper','/zookeeper/quata'节点
    public DataTree() {
        nodes.put("", root);
        nodes.put(rootZookeeper, root);

        /** add the proc node and quota node */
        root.addChild(procChildZookeeper);
        nodes.put(procZookeeper, procDataNode);

        procDataNode.addChild(quotaChildZookeeper);
        nodes.put(quotaZookeeper, quotaDataNode);

        addConfigNode();

        nodeDataSize.set(approximateDataSize());
        try {
            dataWatches = WatchManagerFactory.createWatchManager();
            childWatches = WatchManagerFactory.createWatchManager();
        } catch (Exception e) {
            LOG.error("Unexpected exception when creating WatchManager, " +
                    "exiting abnormally", e);
            System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
        }
    }
    // 创建节点
    public void createNode(final String path, byte data[], List<ACL> acl,
            long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
	// 获取父节点名称
        String parentName = path.substring(0, lastSlash);
	// 获取子名称
        String childName = path.substring(lastSlash + 1);
	// 创建节点基本信息
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
	// 获取父节点
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
	    // 没有父节点,报错NoNodeException
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            Long longval = aclCache.convertAcls(acl);
	    // 获取父类所有子节点
            Set<String> children = parent.getChildren();
	    // 是否节点已经存在
            if (children.contains(childName)) {
                throw new KeeperException.NodeExistsException();
            }
	    // 设置版本号加一
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }
            if (parentCVersion > parent.stat.getCversion()) {
                parent.stat.setCversion(parentCVersion);
                parent.stat.setPzxid(zxid);
            }
	    // 创建子节点DataNode
            DataNode child = new DataNode(data, longval, stat);
	    // 父类添加子类名称
            parent.addChild(childName);
	    // 当前dataTree更新所有数据大小
            nodeDataSize.addAndGet(getNodeSize(path, child.data));
	    // 放入nodes缓存
            nodes.put(path, child);
            EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (ephemeralOwner != 0) {
	       // 创建序列节点,使用序列号
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
            if (outputStat != null) {
            	child.copyStat(outputStat);
            }
        }
        //检查是否为/zookeeper/quata下子节点
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName
                        .substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix = getMaxPrefixWithQuota(path);
        long bytes = data == null ? 0 : data.length;
        if(lastPrefix != null) {
            // ok we have some match and need to update
            updateCountBytes(lastPrefix, bytes, 1);
        }
        updateWriteStat(path, bytes);
	// 调用dataWatches触发节点创建监听事件
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
	// 调用childWatches触发其父节点,子节点改变事件
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
    }

    // 删除节点
    public void deleteNode(String path, long zxid)
            throws KeeperException.NoNodeException {
        int lastSlash = path.lastIndexOf('/');
	// 获取父节点名称
        String parentName = path.substring(0, lastSlash);
	// 获取子节点名称
        String childName = path.substring(lastSlash + 1);
	// 获取父节点
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
	// 没有抛NoNodeException
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            parent.removeChild(childName);
            // 删除子节点,当且当且zxid大于父节点的Pzxid时,更新
            if (zxid > parent.stat.getPzxid()) {
                parent.stat.setPzxid(zxid);
            }
        }
	// 获取map缓存中node,没有抛异常
        DataNode node = nodes.get(path);
        if (node == null) {
            throw new KeeperException.NoNodeException();
        }
        nodes.remove(path);
        synchronized (node) {
	    // 移除aclCache,重新设置node数据大小
            aclCache.removeUsage(node.acl);
            nodeDataSize.addAndGet(-getNodeSize(path, node.data));
        }
        synchronized (parent) {
            long eowner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(eowner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.remove(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.remove(path);
            } else if (eowner != 0) {
                Set<String> nodes = ephemerals.get(eowner);
                if (nodes != null) {
                    synchronized (nodes) {
                        nodes.remove(path);
                    }
                }
            }
        }
	// dataWatches发布NodeDeleted事件
        WatcherOrBitSet processed = dataWatches.triggerWatch(path,
                EventType.NodeDeleted);
	// childWatches发布NodeDeleted事件
        childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
	// childWatches发布NodeChildrenChanged事件
        childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
                EventType.NodeChildrenChanged);
    }
}

下面我们来看getData、setData、getChildren、getACL、setACL等方法。

public class DataTree {
    // 设置节点数据,返回节点Stat状态
    public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
	// 获取DataNode
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            lastdata = n.data;
	    // 设置data
            n.data = data;
	    // 设置time
            n.stat.setMtime(time);
	    // 设置zxide
            n.stat.setMzxid(zxid);
	    // 设置version
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // 如果路径是quota的子节点,更新quata数据.
        String lastPrefix = getMaxPrefixWithQuota(path);
        long dataBytes = data == null ? 0 : data.length;
        if(lastPrefix != null) {
            this.updateCountBytes(lastPrefix, dataBytes
                    - (lastdata == null ? 0 : lastdata.length), 0);
        }
	// 更新nodeDataSize数据
        nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));
        updateWriteStat(path, dataBytes);
	// dataWatches发布NodeDataChanged事件
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
    // 获取节点数据,同时添加watcher监听器
    public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
	// 获取DataNode
        DataNode n = nodes.get(path);
        byte[] data = null;
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
	        // 添加path的监听器
                dataWatches.addWatch(path, watcher);
            }
            data = n.data;
        }
        updateReadStat(path, data == null ? 0 : data.length);
	// 返回数据
        return data;
    }

    // 获取子节点,并添加监听器
    public List<String> getChildren(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
	// 获取DataNode
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        List<String> children;
        synchronized (n) {
            if (stat != null) {
                n.copyStat(stat);
            }
	    // 获取所有子节点path
            children = new ArrayList<String>(n.getChildren());
            if (watcher != null) {
	        // 添加watch监听
                childWatches.addWatch(path, watcher);
            }
        }
        int bytes = 0;
        for (String child : children) {
            bytes += child.length();
        }
        updateReadStat(path, bytes);
	// 返回子节点paths
        return children;
    }
    // 设置ACL数据
    public Stat setACL(String path, List<ACL> acl, int version)
            throws KeeperException.NoNodeException {
        Stat stat = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            aclCache.removeUsage(n.acl);
            n.stat.setAversion(version);
            n.acl = aclCache.convertAcls(acl);
            n.copyStat(stat);
            return stat;
        }
    }
    // 获取ACL数据
    public List<ACL> getACL(String path, Stat stat)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            return new ArrayList<ACL>(aclCache.convertLong(n.acl));
        }
    }
}

dataTree提供处理事务方法processTxn,参数TxnHeader和Record,根据事务头消息的不同类型调用创建、删除、设置数据等不同方法。

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
	ProcessTxnResult rc = new ProcessTxnResult();

	try {
	    rc.clientId = header.getClientId();
	    rc.cxid = header.getCxid();
	    rc.zxid = header.getZxid();
	    rc.type = header.getType();
	    rc.err = 0;
	    rc.multiResult = null;
	    // 获取事务头类型
	    switch (header.getType()) {
	        // 创建
		case OpCode.create:
		    CreateTxn createTxn = (CreateTxn) txn;
		    rc.path = createTxn.getPath();
		    // 调用创建节点方法
		    createNode(
			    createTxn.getPath(),
			    createTxn.getData(),
			    createTxn.getAcl(),
			    createTxn.getEphemeral() ? header.getClientId() : 0,
			    createTxn.getParentCVersion(),
			    header.getZxid(), header.getTime(), null);
		    break;
		// 创建2
		case OpCode.create2:
		    CreateTxn create2Txn = (CreateTxn) txn;
		    rc.path = create2Txn.getPath();
		    Stat stat = new Stat();
		    createNode(
			    create2Txn.getPath(),
			    create2Txn.getData(),
			    create2Txn.getAcl(),
			    create2Txn.getEphemeral() ? header.getClientId() : 0,
			    create2Txn.getParentCVersion(),
			    header.getZxid(), header.getTime(), stat);
		    rc.stat = stat;
		    break;
		// 创建TTL类型节点
		case OpCode.createTTL:
		    CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
		    rc.path = createTtlTxn.getPath();
		    stat = new Stat();
		    createNode(
			    createTtlTxn.getPath(),
			    createTtlTxn.getData(),
			    createTtlTxn.getAcl(),
			    EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
			    createTtlTxn.getParentCVersion(),
			    header.getZxid(), header.getTime(), stat);
		    rc.stat = stat;
		    break;
		 // 创建容器类节点
		case OpCode.createContainer:
		    CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
		    rc.path = createContainerTxn.getPath();
		    stat = new Stat();
		    createNode(
			    createContainerTxn.getPath(),
			    createContainerTxn.getData(),
			    createContainerTxn.getAcl(),
			    EphemeralType.CONTAINER_EPHEMERAL_OWNER,
			    createContainerTxn.getParentCVersion(),
			    header.getZxid(), header.getTime(), stat);
		    rc.stat = stat;
		    break;
		// 删除或者删除容器
		case OpCode.delete:
		case OpCode.deleteContainer:
		    DeleteTxn deleteTxn = (DeleteTxn) txn;
		    rc.path = deleteTxn.getPath();
		    deleteNode(deleteTxn.getPath(), header.getZxid());
		    break;
		// 重新配置或者设置数据
		case OpCode.reconfig:
		case OpCode.setData:
		    SetDataTxn setDataTxn = (SetDataTxn) txn;
		    rc.path = setDataTxn.getPath();
		    rc.stat = setData(setDataTxn.getPath(), setDataTxn
			    .getData(), setDataTxn.getVersion(), header
			    .getZxid(), header.getTime());
		    break;
		// 设置ACL
		case OpCode.setACL:
		    SetACLTxn setACLTxn = (SetACLTxn) txn;
		    rc.path = setACLTxn.getPath();
		    rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
			    setACLTxn.getVersion());
		    break;
		// 关闭session
		case OpCode.closeSession:
		    killSession(header.getClientId(), header.getZxid());
		    break;
	        // 报错
		case OpCode.error:
		    ErrorTxn errTxn = (ErrorTxn) txn;
		    rc.err = errTxn.getErr();
		    break;
		// 检查节点路径
		case OpCode.check:
		    CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
		    rc.path = checkTxn.getPath();
		    break;
		// 多类型
		case OpCode.multi:
		    MultiTxn multiTxn = (MultiTxn) txn ;
		    List<Txn> txns = multiTxn.getTxns();
		    // 获取所有事务,依次遍历,有错就返回
		    rc.multiResult = new ArrayList<ProcessTxnResult>();
		    boolean failed = false;
		    for (Txn subtxn : txns) {
			if (subtxn.getType() == OpCode.error) {
			    failed = true;
			    break;
			}
		    }

		    boolean post_failed = false;
		    // 遍历所有txns,依次递归processTxn方法,处理事务请求
		    for (Txn subtxn : txns) {
			ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
			Record record = null;
			switch (subtxn.getType()) {
			    case OpCode.create:
				record = new CreateTxn();
				break;
			    case OpCode.createTTL:
				record = new CreateTTLTxn();
				break;
			    case OpCode.createContainer:
				record = new CreateContainerTxn();
				break;
			    case OpCode.delete:
			    case OpCode.deleteContainer:
				record = new DeleteTxn();
				break;
			    case OpCode.setData:
				record = new SetDataTxn();
				break;
			    case OpCode.error:
				record = new ErrorTxn();
				post_failed = true;
				break;
			    case OpCode.check:
				record = new CheckVersionTxn();
				break;
			    default:
				throw new IOException("Invalid type of op: " + subtxn.getType());
			}
			assert(record != null);

			ByteBufferInputStream.byteBuffer2Record(bb, record);

			if (failed && subtxn.getType() != OpCode.error){
			    int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()
						 : Code.OK.intValue();

			    subtxn.setType(OpCode.error);
			    record = new ErrorTxn(ec);
			}

			if (failed) {
			    assert(subtxn.getType() == OpCode.error) ;
			}

			TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
							 header.getZxid(), header.getTime(),
							 subtxn.getType());
			ProcessTxnResult subRc = processTxn(subHdr, record, true);
			rc.multiResult.add(subRc);
			if (subRc.err != 0 && rc.err == 0) {
			    rc.err = subRc.err ;
			}
		    }
		    break;
	    }
	} catch (KeeperException e) {
	    if (LOG.isDebugEnabled()) {
		LOG.debug("Failed: " + header + ":" + txn, e);
	    }
	    rc.err = e.code().intValue();
	} catch (IOException e) {
	    if (LOG.isDebugEnabled()) {
		LOG.debug("Failed: " + header + ":" + txn, e);
	    }
	}
    }

相关文章