zookeeper源码分析之RequestProcessor

x33g5p2x  于2021-12-20 转载在 其他  
字(18.9k)|赞(0)|评价(0)|浏览(319)

一、RequestProcessor

请求处理器被链接在一起以处理事务。请求总是按顺序处理的。独立服务器和主从服务器将稍微不同的请求处理器链接在一起。请求总是通过请求处理器链向前移动。请求通过processRequest传递给RequestProcessor。通常,方法将始终由单个线程调用。当调用shutdown时,请求请求处理器还应该关闭它连接到的任何请求处理器。

public interface RequestProcessor {

    void processRequest(Request request) throws RequestProcessorException;

    void shutdown();
}

二、PrepRequestProcessor

此请求处理器通常在请求处理器更改开始时。它设置与更改系统状态的请求相关联的任何事务。它依靠zookeperserver更新未完成的请求,以便在生成事务时考虑队列中要应用的事务。

它通常是请求处理链中的第一个处理器,它继承ZooKeeperCriticalThread线程,实现RequestProcessor接口,内部维护一个LinkedBlockingQueue队列,持有客户端的请求Request对象。线程run方法即是从队列中不断拉去值,进行pRequest处理。

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    
    // 维护的submittedRequests队列
    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
    // 下一个处理器
    private final RequestProcessor nextProcessor;
    // 当前ZooKeeperServer服务器
    ZooKeeperServer zks;

    // 构造方法,需要下一个RequestProcessor
    public PrepRequestProcessor(ZooKeeperServer zks,RequestProcessor nextProcessor) {
        super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }

    // processRequest方法
    public void processRequest(Request request) {
        // 设置prepQueueStartTime开始时间
        request.prepQueueStartTime =  Time.currentElapsedTime();
	// 添加至队列中
        submittedRequests.add(request);
        ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
    }

    // 关闭方法,清空队列,关闭线程,调用nextProcessor的shutdown方法
    public void shutdown() {
        LOG.info("Shutting down");
        submittedRequests.clear();
	// 添加Request.requestOfDeath,使线程关闭
        submittedRequests.add(Request.requestOfDeath);
        nextProcessor.shutdown();
    }

    // 线程持续方法
    public void run() {
        try {
            while (true) {
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                Request request = submittedRequests.take();
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME.add(Time.currentElapsedTime() - request.prepQueueStartTime);
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
		// 判断是否为shutDown
                if (Request.requestOfDeath == request) {
                    break;
                }
		// 设置prepStartTime时间
                request.prepStartTime = Time.currentElapsedTime();
		// 预处理请求
                pRequest(request);
            }
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            handleException(this.getName(), e);
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }
}

三、SyncRequestProcessor

此请求处理器将请求信息记录到磁盘,它对请求信息进行批处理以便于有效地执行IO。在将请求的日志同步到磁盘之前,不会将其传递给下一个请求处理器。

SyncRequestProcessor在以下3种场景中使用

1.Leader-将请求同步到磁盘并转发给AckRequestProcessor,后者将ack发送回自身。

2.Follower-将请求同步到磁盘并将请求转发到SendAckRequestProcessor,后者将数据包发送到leader。                                        SendAckRequestProcessor是可刷新的,它允许我们强制将数据包推送到leader。

3.Observer-将提交的请求同步到磁盘(作为通知包接收)。它从不将ack发送回leader,因此下一个处理器将为空。这改变            了观察者上txnlog的语义,因为它只包含提交的txn。

SyncRequestProcessor实现RequestProcessor接口,继承ZooKeeperCriticalThread线程类。构造方法传入ZooKeeperServer和nextProcessor,内部维护queuedRequests队列,线程run方法不断从队列中获取Request
再flush日志至磁盘。

public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
   
    // queuedRequests队列
    private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    // 快照线程信号量
    private final Semaphore snapThreadMutex = new Semaphore(1);
    // zookeeper服务器
    private final ZooKeeperServer zks;
    // 下一个RequestProcessor
    private final RequestProcessor nextProcessor;
    // 批处理将要去刷新的Request队列
    private final Queue<Request> toFlush;

    // 构造方法
    public SyncRequestProcessor(ZooKeeperServer zks,RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
    }

    // shutdown方法
    public void shutdown() {
        LOG.info("Shutting down");
	// 队列中添加DEATH请求对象,停止当前线程
        queuedRequests.add(REQUEST_OF_DEATH);
        try {
	    // 当前线程join,尽快停止当前线程run方法
            this.join();
	    // 刷新
            this.flush();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
            Thread.currentThread().interrupt();
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
	    // 传递给下一个处理器关闭
            nextProcessor.shutdown();
        }
    }

    // 处理请求方法
    public void processRequest(final Request request) {
        Objects.requireNonNull(request, "Request cannot be null");

        request.syncQueueStartTime = Time.currentElapsedTime();
	// 向队列中添加Request
        queuedRequests.add(request);
        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
    }

    // 线程run方法
    public void run() {
        try {
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());

                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
		// 不断从队列中拉去Request对象
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                    // 在pollTime时间内,没有找到Request,直接批量刷新至磁盘
                    flush();
		    // 刷新结束后,获取
                    si = queuedRequests.take();
                }
		// 线程中断判断
                if (si == REQUEST_OF_DEATH) {
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(
                                    startProcessTime - si.syncQueueStartTime);

                // track the number of records written to the log
		// 添加日志,如果成功向下执行
                if (zks.getZKDatabase().append(si)) {
		    // 开始快照
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // 回滚日志
                        zks.getZKDatabase().rollLog();
                        // 快照线程信号量获取
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
			    // 单独线程拍快照
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
				      // 结束了再释放信号量锁
                                      snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {
                    // 日志添加失败且没有可以刷新的,转发给下一个Processor,处理并刷新
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
		// 添加至toFlush队列
                toFlush.add(si);
                if (shouldFlush()) {
		    // 判断队列是否达到batch条件,flush
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

    // 批量flush
    private void flush() throws IOException, RequestProcessorException {
      if (this.toFlush.isEmpty()) {
          return;
      }
      ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
      long flushStartTime = Time.currentElapsedTime();
      // 事务提交
      zks.getZKDatabase().commit();
      ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);

      if (this.nextProcessor == null) {
        this.toFlush.clear();
      } else {
          while (!this.toFlush.isEmpty()) {
	      // nextProcessor批处理Request
              final Request i = this.toFlush.remove();
              long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
              ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
              this.nextProcessor.processRequest(i);
          }
          if (this.nextProcessor instanceof Flushable) {
              ((Flushable)this.nextProcessor).flush();
          }
          lastFlushTime = Time.currentElapsedTime();
      }
    }
}

四、FinalRequestProcessor

此请求处理器实际上应用与请求和服务之间的任何查询。它总是在请求处理器链的最后,因此它没有nextProcessor成员。此请求处理器依靠ZooKeeperServer填充ZooKeeperServer的未完成请求成员。

public class FinalRequestProcessor implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(FinalRequestProcessor.class);

    ZooKeeperServer zks;

    public FinalRequestProcessor(ZooKeeperServer zks) {
        this.zks = zks;
    }
    // 处理Request
    public void processRequest(Request request) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing request:: " + request);
        }
        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
        if (request.type == OpCode.ping) {
            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
        }
	// 处理事务结果
        ProcessTxnResult rc = null;
	// zookeeperServer的changes队列
        synchronized (zks.outstandingChanges) {
            // 调用zks处理request事务
            rc = zks.processTxn(request);
            if (request.getHdr() != null) {
                TxnHeader hdr = request.getHdr();
                long zxid = hdr.getZxid();
                while (!zks.outstandingChanges.isEmpty()
                       && zks.outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = zks.outstandingChanges.remove();
                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                    if (cr.zxid < zxid) {
                        LOG.warn("Zxid outstanding " + cr.zxid
                                 + " is less than current " + zxid);
                    }
                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                        zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
            }

            // do not add non quorum packets to the queue.
            if (request.isQuorum()) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
        if (request.type == OpCode.closeSession && connClosedByClient(request)) {
            if (closeSession(zks.serverCnxnFactory, request.sessionId) ||
                    closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
                return;
            }
        }

        if (request.getHdr() != null) {
            long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
            if (propagationLatency > 0) {
                ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
            }
        }

        if (request.cnxn == null) {
            return;
        }
        ServerCnxn cnxn = request.cnxn;

        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();

        String lastOp = "NA";
        zks.decInProcess();
        Code err = Code.OK;
        Record rsp = null;
        String path = null;
        try {
            if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
                if (request.getException() != null) {
                    throw request.getException();
                } else {
                    throw KeeperException.create(KeeperException.Code
                            .get(((ErrorTxn) request.getTxn()).getErr()));
                }
            }

            KeeperException ke = request.getException();
            if (ke instanceof SessionMovedException) {
                throw ke;
            }
            if (ke != null && request.type != OpCode.multi) {
                throw ke;
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("{}",request);
            }
	    // 获取request的类型
            switch (request.type) {
            case OpCode.ping: {
                lastOp = "PING";
                updateStats(request, lastOp, lastZxid);
		// 如果是ping,发送响应
                cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
                return;
            }
            case OpCode.createSession: {
                lastOp = "SESS";
                updateStats(request, lastOp, lastZxid);
		// 如果是创建session,则完成session初始化
                zks.finishSessionInit(request.cnxn, true);
                return;
            }
            case OpCode.multi: {
                lastOp = "MULT";
                rsp = new MultiResponse() ;

                for (ProcessTxnResult subTxnResult : rc.multiResult) {
                    OpResult subResult ;
                    switch (subTxnResult.type) {
                        case OpCode.check:
                            subResult = new CheckResult();
                            break;
                        case OpCode.create:
                            subResult = new CreateResult(subTxnResult.path);
                            break;
                        case OpCode.create2:
                        case OpCode.createTTL:
                        case OpCode.createContainer:
                            subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
                            break;
                        case OpCode.delete:
                        case OpCode.deleteContainer:
                            subResult = new DeleteResult();
                            break;
                        case OpCode.setData:
                            subResult = new SetDataResult(subTxnResult.stat);
                            break;
                        case OpCode.error:
                            subResult = new ErrorResult(subTxnResult.err) ;
                            if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
                                throw new SessionMovedException();
                            }
                            break;
                        default:
                            throw new IOException("Invalid type of op");
                    }

                    ((MultiResponse)rsp).add(subResult);
                }

                break;
            }
	    // 如果创建,则返回CreateResponse
            case OpCode.create: {
                lastOp = "CREA";
                rsp = new CreateResponse(rc.path);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {
                lastOp = "CREA";
                rsp = new Create2Response(rc.path, rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.delete:
            case OpCode.deleteContainer: {
                lastOp = "DELE";
                err = Code.get(rc.err);
                break;
            }
            case OpCode.setData: {
                lastOp = "SETD";
                rsp = new SetDataResponse(rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.reconfig: {
                lastOp = "RECO";
                rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.setACL: {
                lastOp = "SETA";
                rsp = new SetACLResponse(rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.closeSession: {
                lastOp = "CLOS";
                err = Code.get(rc.err);
                break;
            }
            case OpCode.sync: {
                lastOp = "SYNC";
                SyncRequest syncRequest = new SyncRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        syncRequest);
                rsp = new SyncResponse(syncRequest.getPath());
                break;
            }
            case OpCode.check: {
                lastOp = "CHEC";
                rsp = new SetDataResponse(rc.stat);
                err = Code.get(rc.err);
                break;
            }
            case OpCode.exists: {
                lastOp = "EXIS";
                // TODO we need to figure out the security requirement for this!
                ExistsRequest existsRequest = new ExistsRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        existsRequest);
                path = existsRequest.getPath();
                if (path.indexOf('\0') != -1) {
                    throw new KeeperException.BadArgumentsException();
                }
                Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                        .getWatch() ? cnxn : null);
                rsp = new ExistsResponse(stat);
                break;
            }
            case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);
                path = getDataRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo, path, null);
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(path, stat,
                        getDataRequest.getWatch() ? cnxn : null);
                rsp = new GetDataResponse(b, stat);
                break;
            }
            case OpCode.setWatches: {
                lastOp = "SETW";
                SetWatches setWatches = new SetWatches();
                // XXX We really should NOT need this!!!!
                request.request.rewind();
                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                long relativeZxid = setWatches.getRelativeZxid();
                zks.getZKDatabase().setWatches(relativeZxid,
                        setWatches.getDataWatches(),
                        setWatches.getExistWatches(),
                        setWatches.getChildWatches(), cnxn);
                break;
            }
            case OpCode.getACL: {
                lastOp = "GETA";
                GetACLRequest getACLRequest = new GetACLRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getACLRequest);
                path = getACLRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN,
                        request.authInfo, path, null);

                Stat stat = new Stat();
                List<ACL> acl =
                        zks.getZKDatabase().getACL(path, stat);
                try {
                    PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                            ZooDefs.Perms.ADMIN,
                            request.authInfo, path, null);
                    rsp = new GetACLResponse(acl, stat);
                } catch (KeeperException.NoAuthException e) {
                    List<ACL> acl1 = new ArrayList<ACL>(acl.size());
                    for (ACL a : acl) {
                        if ("digest".equals(a.getId().getScheme())) {
                            Id id = a.getId();
                            Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
                            acl1.add(new ACL(a.getPerms(), id1));
                        } else {
                            acl1.add(a);
                        }
                    }
                    rsp = new GetACLResponse(acl1, stat);
                }
                break;
            }
            case OpCode.getChildren: {
                lastOp = "GETC";
                GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getChildrenRequest);
                path = getChildrenRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo, path, null);
                List<String> children = zks.getZKDatabase().getChildren(
                        path, null, getChildrenRequest
                                .getWatch() ? cnxn : null);
                rsp = new GetChildrenResponse(children);
                break;
            }
            case OpCode.getAllChildrenNumber: {
                lastOp = "GETACN";
                GetAllChildrenNumberRequest getAllChildrenNumberRequest = new
                        GetAllChildrenNumberRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getAllChildrenNumberRequest);
                path = getAllChildrenNumberRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo, path, null);
                int number = zks.getZKDatabase().getAllChildrenNumber(path);
                rsp = new GetAllChildrenNumberResponse(number);
                break;
             }
            case OpCode.getChildren2: {
                lastOp = "GETC";
                GetChildren2Request getChildren2Request = new GetChildren2Request();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getChildren2Request);
                Stat stat = new Stat();
                path = getChildren2Request.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo, path, null);
                List<String> children = zks.getZKDatabase().getChildren(
                        path, stat, getChildren2Request
                                .getWatch() ? cnxn : null);
                rsp = new GetChildren2Response(children, stat);
                break;
            }
            case OpCode.checkWatches: {
                lastOp = "CHKW";
                CheckWatchesRequest checkWatches = new CheckWatchesRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        checkWatches);
                WatcherType type = WatcherType.fromInt(checkWatches.getType());
                path = checkWatches.getPath();
                boolean containsWatcher = zks.getZKDatabase().containsWatcher(
                        path, type, cnxn);
                if (!containsWatcher) {
                    String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
                            path, type);
                    throw new KeeperException.NoWatcherException(msg);
                }
                break;
            }
            case OpCode.removeWatches: {
                lastOp = "REMW";
                RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        removeWatches);
                WatcherType type = WatcherType.fromInt(removeWatches.getType());
                path = removeWatches.getPath();
                boolean removed = zks.getZKDatabase().removeWatch(
                        path, type, cnxn);
                if (!removed) {
                    String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
                            path, type);
                    throw new KeeperException.NoWatcherException(msg);
                }
                break;
            }
            case OpCode.getEphemerals: {
                lastOp = "GETE";
                GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
                String prefixPath = getEphemerals.getPrefixPath();
                Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
                List<String> ephemerals = new ArrayList<>();
                if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
                    ephemerals.addAll(allEphems);
                } else {
                    for (String p: allEphems) {
                        if(p.startsWith(prefixPath)) {
                            ephemerals.add(p);
                        }
                    }
                }
                rsp = new GetEphemeralsResponse(ephemerals);
                break;
            }
            }
        } catch (SessionMovedException e) {
            cnxn.sendCloseSession();
            return;
        } catch (KeeperException e) {
            err = e.code();
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error("Failed to process " + request, e);
            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request.request;
            bb.rewind();
            while (bb.hasRemaining()) {
                sb.append(Integer.toHexString(bb.get() & 0xff));
            }
            LOG.error("Dumping request buffer: 0x" + sb.toString());
            err = Code.MARSHALLINGERROR;
        }

        ReplyHeader hdr =
            new ReplyHeader(request.cxid, lastZxid, err.intValue());

        updateStats(request, lastOp, lastZxid);

        try {
            if (request.type == OpCode.getData && path != null && rsp != null) {
                // Serialized read responses could be cached by the connection object.
                // Cache entries are identified by their path and last modified zxid,
                // so these values are passed along with the response.
                GetDataResponse getDataResponse = (GetDataResponse)rsp;
                Stat stat = null;
                if (getDataResponse.getStat() != null) {
                    stat = getDataResponse.getStat();
                }
                cnxn.sendResponse(hdr, rsp, "response", path, stat);
            } else {
                cnxn.sendResponse(hdr, rsp, "response");
            }
            if (request.type == OpCode.closeSession) {
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error("FIXMSG",e);
        }
    }

    // shutdown方法
    public void shutdown() {
        // we are the final link in the chain
        LOG.info("shutdown of request processor complete");
    }
}

相关文章