Netty学习(三)-Netty重要接口讲解

x33g5p2x  于2021-12-21 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(427)

上一节我们写了一个HelloWorld,对于Netty的运行有了一定的了解,知道Netty是如何启动客户端和服务器端。这一节我们简要的讲解一下几个重要的接口,初步探讨Netty的运行机制,当然刚学Netty就深入原理肯定是很枯燥的,所以我们就点到为止。

1. ChannelPipeLine和ChannelHandler

在上一篇中我们在ChannelInitializer类的initChannel方法中使用了ChannelPipeline,然后在ChannelPipeline中使用了handler来处理业务逻辑。

ChannelPipeline是ChannelHandler的容器,它负责ChannelHandler的管理和事件拦截与调度。Netty的ChannelPipeline和ChannelHandler机制类似于Servlet 和Filter 过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的定制。

Netty的channel运用机制和Filter过滤器机制一样,它将Channel 的数据管道抽象为ChannelPipeline. 消息在ChannelPipeline中流动和传递。ChannelPipeline 持有I/O事件拦截器ChannelHandler 的链表,由ChannelHandler 对I/0 事件进行拦截和处理,可以方便地通过新增和删除ChannelHandler 来实现小同的业务逻辑定制,不需要对已有的ChannelHandler进行修改,能够实现对修改封闭和对扩展的支持。

通过一张图我们来看一下他们之间的关系:

一个Channel中包含一个ChannelPipeline,用来处理Channel中的事件,一个ChannelPipeline中可以包含很多个handler,第二节的示例代码中我们也看到了,使用各种handler来处理通信信息。

同时我们也注意到在hadler中继承了ChannelInboundHandlerAdapter类并实现了他的一些方法,比如:channelRead,channelActive,channelInactive等等,我们看到这些方法中都有一个参数:ChannelHandlerContext ctx。这个ChannelHandlerContext就是handler的上下文对象,有了这个ChannelHandlerContext你就获得了一切,你可以获得通道,获得事件的控制权。

事实上,用户不需要自己创建pipeline,因为使用ServerBootstrap 或者Bootstrap 启动
服务端或者客户端时, Netty 会为每个Channel 连接创建一个独立的pipeline。

  1. ChannelPipeline pipeline = socketChannel.pipeline();
  2. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,Delimiters.lineDelimiter()));
  3. pipeline.addLast("decoder", new StringDecoder());
  4. pipeline.addLast("encoder", new StringEncoder());
  5. // 客户端的逻辑
  6. pipeline.addLast("handler", new HelloWorldClientHandler());

ChannelPipeline 是线程安全的, 这意味着N个业务线程可以并发地操作ChannelPipeline
而不存在多线程并发问题。但是,ChannelHandler却不是线程安全的,这意味着尽管
ChannelPipeline 是线程去全的, 但是仍然需要自己保证ChannelHandler的线程安全。

Netty 中的事件分为inbound 事件和outbound 事件。inbound 事件通常由I/O线程触发,例如TCP 链路建立事件、链路关闭事件、读事件、异常通知事件等。Outbound 事件通常是I/O 用户主动发起的网络I/O 操作,例如用户发起的连接操作、绑定操作、消息发送等操作。

我们常用的inbound事件有:

  • ChannelHandlerContext fireChannelRegistered() //channel注册事件
  • ChannelHandlerContext fireChannelActive() //channel激活事件
  • ChannelHandlerContext fireExceptionCaught(Throwable var1) //channel异常处理事件
  • ChannelHandlerContext fireUserEventTriggered(Object var1) //用户自定义事件
  • ChannelHandlerContext fireChannelRead(Object var1) //读事件

pipeline 中以fireXXX命名的方法都是从I/O 线程流向用户业务Handler的inbound 事件,它们的实现因功能而异,但是处理步骤类似:

  1. 调用HeadHandler对应的fireXXX 方法
  2. 执行事件相关的逻辑操作

常用的outbound事件有:

  • ChannelFuture bind(SocketAddress var1, ChannelPromise var2) //绑定地址
  • ChannelFuture connect(SocketAddress var1, ChannelPromise var2) //连接服务器
  • ChannelFuture write(Object var1) //发送事件
  • ChannelHandlerContext flush() //刷新事件

上面我们说到事件,netty的事件机制是由前至后的,一般来说,都是一个channel的ChannnelActive方法中调用fireChannelActive来触发调用下一个handler中的ChannelActive方法,即你在ChannelPipeline中添加handler的时候,要在第一个handler的channelActive方法中调用fireChannelActive,以此来触发下一个事件。我们再来写一个案例说明一下:

客户端:

  1. public class HWClient {
  2. private int port;
  3. private String address;
  4. public HWClient(int port, String address) {
  5. this.port = port;
  6. this.address = address;
  7. }
  8. public void start(){
  9. EventLoopGroup group = new NioEventLoopGroup();
  10. Bootstrap bootstrap = new Bootstrap();
  11. bootstrap.group(group)
  12. .channel(NioSocketChannel.class)
  13. .handler(new ClientChannelInitializer());
  14. try {
  15. ChannelFuture future = bootstrap.connect(address,port).sync();
  16. future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
  17. future.channel().closeFuture().sync();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }finally {
  21. group.shutdownGracefully();
  22. }
  23. }
  24. public static void main(String[] args) {
  25. HWClient client = new HWClient(7788,"127.0.0.1");
  26. client.start();
  27. }
  28. }

客户端ClientChannelInitializer:

  1. public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
  2. protected void initChannel(SocketChannel socketChannel) throws Exception {
  3. ChannelPipeline pipeline = socketChannel.pipeline();
  4. pipeline.addLast("decoder", new StringDecoder());
  5. pipeline.addLast("encoder", new StringEncoder());
  6. // 客户端的handler
  7. //先调用handler在ChannnelActive方法中调用fireChannelActive会激活handler1
  8. pipeline.addLast("handler", new HWClientHandler());
  9. pipeline.addLast("handler1", new BaseClientHandler());
  10. }
  11. }

客户端handler:

  1. public class HWClientHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. System.out.println("server say : "+msg.toString());
  5. }
  6. @Override
  7. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  8. System.out.println("Handler1");
  9. ctx.fireChannelActive();
  10. }
  11. @Override
  12. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  13. System.out.println("Client is close");
  14. }
  15. }

客户端的第二个handler:

  1. public class BaseClientHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  4. System.out.println("Handler2");
  5. }
  6. @Override
  7. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  8. ctx.close();
  9. }
  10. }

服务端:

  1. public class HWServer {
  2. private int port;
  3. public HWServer(int port) {
  4. this.port = port;
  5. }
  6. public void start(){
  7. EventLoopGroup bossGroup = new NioEventLoopGroup();
  8. EventLoopGroup workGroup = new NioEventLoopGroup();
  9. ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
  10. .channel(NioServerSocketChannel.class)
  11. .childHandler(new ServerChannelInitializer());
  12. try {
  13. ChannelFuture future = server.bind(port).sync();
  14. future.channel().closeFuture().sync();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. bossGroup.shutdownGracefully();
  19. workGroup.shutdownGracefully();
  20. }
  21. }
  22. public static void main(String[] args) {
  23. HWServer server = new HWServer(7788);
  24. server.start();
  25. }
  26. }

服务端ServerChannelInitializer:

  1. public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel socketChannel) throws Exception {
  4. ChannelPipeline pipeline = socketChannel.pipeline();
  5. // 字符串解码 和 编码
  6. pipeline.addLast("decoder", new StringDecoder());
  7. pipeline.addLast("encoder", new StringEncoder());
  8. // 自己的逻辑Handler
  9. pipeline.addLast("handler", new HWServerHandler());
  10. }
  11. }

服务端handler:

  1. public class HWServerHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  4. System.out.println("channelActive");
  5. }
  6. @Override
  7. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  8. System.out.println(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
  9. ctx.write("received your msg");
  10. ctx.flush();
  11. }
  12. @Override
  13. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  14. super.exceptionCaught(ctx, cause);
  15. ctx.close();
  16. }
  17. }

我们启动服务端和客户端,会发现客户端的两个handler都通过了。

先调用HWClientHandler,打印出:HWClientHandler channelActive;继而调用了BaseClientHandler ,打印出:BaseClient1Handler channelActive.

相关文章