Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler

x33g5p2x  于2021-12-21 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(367)

概述

Channel接口针对Channel的读入和写出IO事件的处理,定义了两个拓展接口:ChannelInboundHandler用于定义对读入IO事件的处理,ChannelOutboundHandler用于定义写出IO事件的处理。

ChannelInboundHandler

ChannelInboundHandler接口定义
/** * {@link ChannelHandler} which adds callbacks for state changes. This allows the user * to hook in to state changes easily. */
public interface ChannelInboundHandler extends ChannelHandler {

    /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /** * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop} */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /** * The {@link Channel} of the {@link ChannelHandlerContext} is now active */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its * end of lifetime. */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /** * Invoked when the current {@link Channel} has read a message from the peer. */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /** * Invoked when the last message read by the current read operation has been consumed by * {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further * attempt to read an inbound data from the current {@link Channel} will be made until * {@link ChannelHandlerContext#read()} is called. */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /** * Gets called if an user event was triggered. */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /** * Gets called once the writable state of a {@link Channel} changed. You can check the state with * {@link Channel#isWritable()}. */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /** * Gets called if a {@link Throwable} was thrown. */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
ChannelInboundHandlerAdapter
  • ChannelInboundHandler的默认实现类,方便使用,用户通过拓展该类,只重写与自身关注的IO事件的处理的方法。ChannelInboundHandlerAdapter的每个方法的默认实现都是通过ChannelHandlerContext将IO事件或接收到的数据,传给所在的ChannelPipeline的下一个ChannelInboundHandler:

  • 业务处理逻辑处理:用户可以通过拓展ChannelInboundHandlerAdapter,重写相应的方法来,生成新的子类的方式来定义业务需要的处理逻辑,Netty默认针对特定功能的处理,提供了一些ChannelInboundHandler的实现类,详见下面分析。
  • 对于从Channel读入的数据,在调用channelRead方法处理时,默认实现也是传给下一个ChannelInboundHandler处理,不会销毁该数据对象,释放掉该数据所占用的空间的:

如果数据只需在当前ChannelInboundHandler处理,而不需要继续往下传输,则可以调用ReferenceCountUtil.release(msg)手动释放掉,或者拓展SimpleChannelInboundHandler类。

  1. SimpleChannelInboundHandler

SimpleChannelInboundHandler为ChannelInboundHandlerAdapter的一个拓展实现,重写了channelRead方法,并提供了一个抽象方法channelRead0(在netty 5.0之后channelRead0方法名称变成了messageReceived)供用户实现自身的数据处理逻辑:

针对Channel读入的数据的处理,提供了两个功能:

  1. 特定类型数据处理:用户通过拓展实现SimpleChannelInboundHandler来定义业务处理的ChannelInboundHandler时,通过指定泛型参数I,限制该ChannelInboundHandler只处理Channel读入的类型为I的数据;
  2. 自动释放数据:如果Channel读入的数据的类型与泛型参数I匹配,则在该ChannelInboundHandler处理掉了,在finally中,根据调用构造函数创建ChannelInboundHandler时,是否需要自动释放数据(默认为true),来自动进行数据空间的释放。
  3. 数据的保留:如果用户在实现channelRead0方法自定义数据处理逻辑时,需要将该数据传给下一个ChannelInboundHandler,则需要调用ReferenceCountUtil.retain(msg)方法,原理是将msg的引用计数加1,因为ReferenceCountUtil.release(msg)是将msg的引用计数减1,同时当引用计数变成0时,释放该数据:
public class StringHandler extends SimpleChannelInboundHandler<String> {

     @Override
     protected void channelRead0(ChannelHandlerContext ctx, String message)
             throws Exception {
         System.out.println(message);
         // 不释放数据,交给下一个ChannelInboundHandler继续处理
         ReferenceCountUtil.retain(message);
         ctx.fireChannelRead(message);
     }
 }

ChannelOutboundHandler

ChannelOutboundHandler接口定义
/** * {@link ChannelHandler} which will get notified for IO-outbound-operations. */
public interface ChannelOutboundHandler extends ChannelHandler {
    /** * Called once a bind operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the bind operation is made * @param localAddress the {@link SocketAddress} to which it should bound * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /** * Called once a connect operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the connect operation is made * @param remoteAddress the {@link SocketAddress} to which it should connect * @param localAddress the {@link SocketAddress} which is used as source on connect * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /** * Called once a disconnect operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /** * Called once a close operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /** * Called once a deregister operation is made from the current registered {@link EventLoop}. * * @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /** * Intercepts {@link ChannelHandlerContext#read()}. */
    void read(ChannelHandlerContext ctx) throws Exception;

    /** * Called once a write operation is made. The write operation will write the messages through the * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once * {@link Channel#flush()} is called * * @param ctx the {@link ChannelHandlerContext} for which the write operation is made * @param msg the message to write * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /** * Called once a flush operation is made. The flush operation will try to flush out all previous written messages * that are pending. * * @param ctx the {@link ChannelHandlerContext} for which the flush operation is made * @throws Exception thrown if an error occurs */
    void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelOutboundHandlerAdapter

与ChannelInboundHandlerAdapter的作用类似,方法默认实现也是通过ChannelHandlerContext将写出的数据,交给下一个ChannelOutboundHandler处理。

ChannelDuplexHandler

同时具有ChannelInboundHandler和ChannelInboundHandler的功能,可以实现相应方法,对Channel的读入数据和写出数据都进行处理,默认实现也是交给下一个ChannelHandler处理。

Netty提供的ChannelHandler实现

Netty在handler子模块,针对不同的功能,包括流量控制,数据flush,ip过滤,日志,ssl,大数据流处理,超时心跳检测,拥塞控制,提供了相应的ChannelHandler实现类,如图:

相关文章