使用 Netty 实现点对点的通信

x33g5p2x  于2022-05-28 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(204)

一 服务端

1 主程序

package netty.socket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyNettyServerTest {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap:服务端启动时的初始化操作
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 将 bossGroup 和 workerGroup 注册到服务端的 Channel 上,并注册一个服务端的初始化器 MyNettyServerInitializer
            // 该初始化器中的 initChannel()方法,会在连接被注册后立刻执行;最后将端口号绑定到 8888
            ChannelFuture channelFuture = serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyNettyServerInitializer())
                    .bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2 自定义初始化器

package netty.socket;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    // 连接被注册后,立刻执行此方法
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        // LengthFieldBasedFrameDecoder:用于解析带固定长度数据包。
        // TCP发送的数据规则:可以将数据进行拆分或合并,因此对端接收到的数据包可能不是发送时的格式;
        // 一般的做法是在包头设置length字段,指明包长度,再由接受方根据 length 拼接或者剪裁收到的数据,从而形成完整的数据包
        pipeline.addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));
        // 将上条语句的 length 加入到传递的数据中
        pipeline.addLast("LengthFieldPrepender", new LengthFieldPrepender(8));
        // 传递字符串的编码解码器
        pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        // 自定义处理器
        pipeline.addLast("MyNettyServerHandler", new MyNettyServerHandler());
    }
}

3 自定义处理器

package netty.socket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Scanner;

public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) throws Exception {
        // 通过 ctx 获取远程(客户端)的端口号,并打印出对方(客户端)发来的消息
        System.out.println("【服务端】接收的请求来自:" + ctx.channel().remoteAddress() + ",消息内容【" + receiveMsg + "】");
        System.out.println("请向【客户端】发送一条消息:");
        String sendMsg = new Scanner(System.in).nextLine();
        ctx.channel().writeAndFlush(sendMsg);
    }
}

二 客户端

1 主程序类

package netty.socket;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyNettyClientTest {
    public static void main(String[] args) {
        // 服务端有 2 个EventLoopGroup,bossGroup 用于获取连接并将连接分发给 workerGroup;而 workerGroup 负责真正的处理连接;
        // 但客户端仅仅需要连接服务端(相当于服务端的bossGroup),因此只需要一个 EventLoopGroup
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            /*
                注意:
                下条语句用到了 handler(),但在服务端 MyNettyServerTest 中用到的是 childHandler(),二者的区别如下:
                 bossGroup 获取并分发连接:使用 handler()
                 workerGroup 实际处理连接:用 childHandler()
             */
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new MyNettyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

2 自定义初始化器

package netty.socket;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyNettyClientInitializer extends ChannelInitializer<SocketChannel> {
    // 连接被注册后,立刻执行此方法
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        pipeline.addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));
        pipeline.addLast("LengthFieldPrepender", new LengthFieldPrepender(8));
        pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        // 自定义处理器
        pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());
    }
}

3 自定义处理器

package netty.socket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Scanner;

public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) {
        System.out.println("【客户端】接收的请求来自:" + ctx.channel().remoteAddress() + ",消息内容【" + receiveMsg + "】");
        System.out.println("请向【服务端】发送一条消息:");
        String sendMsg = new Scanner(System.in).nextLine();
        ctx.channel().writeAndFlush(sendMsg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("打破僵局的第一条消息...");
    }
}

三 说明

服务端和客户端都在通过 channelRead0() 方法等待对方发来消息,二者都处在“等待”状态。为了打破僵局,客户端先主动向服务端发送第一条消息“打破僵局的第一条消息...”.服务端收到此消息后,就开始了和客户端的双向通信。

四 测试

1 先运行服务端,整个过程中,服务端打印如下

【服务端】接收的请求来自:/127.0.0.1:52054,消息内容【打破僵局的第一条消息...】

请向【客户端】发送一条消息:

hello,我是服务端

【服务端】接收的请求来自:/127.0.0.1:52054,消息内容【hello,我是客户端】

请向【客户端】发送一条消息:

2 再运行客户端,整个过程中,客户端打印如下

【客户端】接收的请求来自:/127.0.0.1:8888,消息内容【hello,我是服务端】

请向【服务端】发送一条消息:

hello,我是客户端

相关文章