Netty原理:pipeline

x33g5p2x  于2021-11-25 转载在 其他  
字(2.9k)|赞(0)|评价(0)|浏览(363)
ChannelPipeline

pipeline中维护入站和出站链路,两条链路的执行顺序。

handler只负责处理自身的业务逻辑,对通道而言,它是无状态的。通道的信息会保存到handlerContext处理器上下文中,它是连接pipeline和handler之间的中间角色。

pipeline管理的是由handlerContext包裹的handler,也就是说,当添加handler时,先将其转为handlerContext,然后添加到pipeline的双向链表中。头结点叫做HeadContext,尾节点叫做TailContext。

ch.pipeline().addLast(new NettyServerHandler());
 
 [DefaultChannelPipeline]
 ----------------------------------------------------------------
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
    
    
    // 关键逻辑
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
            checkMultiplicity(handler);
		    // 将handler封装为context
            newCtx = newContext(group, filterName(name, handler), handler);
            // 将context添加到链表尾部
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 判断当前通道的注册状态,如果是未注册,执行此逻辑
            if (!registered) {
                // 添加一个任务,当通道被注册后,能够回调handlerAdded方法
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 如果已被注册  执行调用handlerAdded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    
    
    
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    
    
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    
    // 尾节点会提前声明并创建
    final AbstractChannelHandlerContext tail;
    
    //  prev -> tail   在其中插入newctx
    //  prev -> newctx -> tail   放到倒数第二个位置中  tail节点是保持不变的
    //  依次更改 新节点的前后指针   以及prev节点的后置指针和tail节点的前置指针
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    
    // 构造器中已经提前创建了头尾节点
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

使用pipeline模式的优点:
A) 解耦,让处理器逻辑独立,可以被多个channel共享
B) channel相关信息,交给context维护
C) 具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序

相关文章