Netty 编写流程和服务端开发实战

x33g5p2x  于2022-05-28 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(371)

一 Netty 的核心概念

Netty 是由 JBOSS 开源的一款 NIO 网络编程框架,可用于快速开发网络应用。netty.io 官网中对 Netty 的介绍是“Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发高性能的服务端和客户端”。

总的来说,Netty 是一个基于 NIO 和可扩展的事件模型的客户端和服务端框架,可以极大地简化基于 TCP、UDP 等协议的网络服务。并且 Netty 对于各种传输类型(阻塞或非阻塞式 Socket)及通信方式(HTTP 或 WebSocket)都提供了统一的 API 接口,提供了灵活的可扩展性,高度可自定义的线程模型(单线程、线程池等 ),支持使用无连接的数据报(UDP)进行通信,具有高吞吐量、低延迟、资源消耗低、最低限度的内存复制等特性。除了优越的性能外,Netty 还完整地支持 SSL/TLS 和 StartTLS 等加密传输协议,保证了数据传输的安全性。

在实际使用时,Netty 可作为 Socket 编程的中间件:也可以和 Protobuf 等技术结合使用,实现一个 RPC 框架,实现远程过程调用;或者作为一个机遇 WebSocket 的厂连接服务器,实现客户端与服务端的长连接通信。

二 Netty 编写流程

Netty 程序可以按以下“套路”编写:依次是主程序类,自定义初始化器、自定义处理器。

这里介绍一个简单的服务端开发案例。

1 主程序类

a 通过 serverBootStrap 注册 bossGroup 和 workerGroup 两个事件循环组,其中 bossGroup 用于获取客户端连接,workerGroup 用于处理客户端连接,类似常见 Master-Slave 结构。

b 将 Channel 类型指定为 NioServerSocketChannel,并在服务启动时关联自定义初始化器 MyNettyServerInitializer,从而进行初始化操作。

2 初始化器

MyNettyServerInitializer,继承自 Netty 提供的初始化器 ChannelInitializer。

Netty 封装了各种各样的内置处理器,用于实现各种功能。并且 ChannelInitializer 的 initChannel() 方法会在某个连接注册到 Channel 后立即被触发调用。因此,可以根据业务需求,在 initChannel() 中添加若干个 Netty 内置处理器,利用 Netty 强大的类库直接处理大部分业务。最后再在 initChannel() 中添加一个自定义处理器,用于实现特定业务的具体功能。

3 自定义处理器

MyNettyServerHandler,继承自 SimpleChannelInboundHandler,该父类的 channelRead0() 方法可以接收客户端的所有请求,并作出响应。

三 实战

1 主程序类

package netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyNettyServerTest {
    public static void main(String[] args) {
            /*
            EventLoopGroup:事件循环组,是一个线程池,也是一个死循环,用于不断的接收用户请求
            bossGroup:用于监听及建立连接,并把每一个连接抽象为一个 channel ,最后将连接再交给 workerGroup 处理
            workerGroup:真正的处理连接
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap:服务端启动时的初始化操作
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 将 bossGroup 和 workerGroup 注册到服务端的 Channel 上,并注册一个服务端的初始化器NettyServerInitializer(该初始化器中的initChannel()方法,会在连接被注册后立刻执行);最后将端口号绑定到8888
            ChannelFuture channelFuture = serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyNettyServerInitializer()).bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2 初始化器

package netty.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    // 连接被注册后,立刻执行此方法
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        // 加入 netty 提供的处理器。
        // 语法:pipeline.addLast(定义处理器的名字,处理器);
        // HttpServerCodec:对请求和响应进行编码、解码
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        // 增加自定义处理器 NettyServerHandler,用于实际处理请求,并给出响应
        pipeline.addLast("MyNettySocketServerHandler", new MyNettyServerHandler());
    }
}

3 自定义处理器

package netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.net.URI;

// 自定义处理器:用于输出 hello netty
public class MyNettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    // channelRead0() 方法:接收客户端请求,并且作出响应;类似于 Servlet 中的 doGet()、doPost()等方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest httpRequest = (HttpRequest) msg;
            URI uri = new URI(httpRequest.uri());
            if (!"/favicon.ico".equals(uri.getPath())) {
                System.out.println("channelRead0 invoke...");
                // ByteBuf对象:定义响应的内容
                ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);
                // FullHttpResponse 对象:响应对象,定义响应的具体信息
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                // content.readableBytes() :响应内容的长度
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
                // 将响应 返回给客户端
                ctx.writeAndFlush(response);
            }
        }
    }

    // 当增加新的处理器时,触发此方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("1.handlerAdded(),增加了新的处理器...");
        super.handlerAdded(ctx);
    }

    // 当通道被注册到一个事件循环组EventLoop上时,执行此方法
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("2.channelRegistered(),通道被注册...");
        super.channelRegistered(ctx);
    }

    // 当通道处于活跃状态(连接到某个远端,可以收发数据)时,执行此方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("3.channelActive(),通道连接到了远端,处于活跃状态...");
        super.channelActive(ctx);
    }

    // 当通道处于非活跃状态(与远端断开了连接)时,执行此方法
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("4.channelInactive(),通道远端断开了连接,处于非活跃状态... ");
        super.channelInactive(ctx);
    }

    // 当通道被取消注册时,执行此方法
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("5.channelUnregistered(),通道被取消了注册...");
        super.channelUnregistered(ctx);
    }

    // 当程序发生异常时,执行此方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

四 测试结果

1 用浏览器测试

1.handlerAdded(),增加了新的处理器...

1.handlerAdded(),增加了新的处理器...

2.channelRegistered(),通道被注册...

2.channelRegistered(),通道被注册...

3.channelActive(),通道连接到了远端,处于活跃状态...

3.channelActive(),通道连接到了远端,处于活跃状态...

channelRead0 invoke...

4.channelInactive(),通道远端断开了连接,处于非活跃状态...

5.channelUnregistered(),通道被取消了注册...

4.channelInactive(),通道远端断开了连接,处于非活跃状态...

5.channelUnregistered(),通道被取消了注册...

相关文章