使用 Netty 实现服务端和客户端的点对多点通信——聊天室功能

x33g5p2x  于2022-05-28 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(374)

一 需求

1 监听所有客户端的上线和下线。

2 将某一个客户端的上线和离线情况,转告给其他客户端“客户端XX上/下线”

3 客户端先将消息发送给服务端,服务端再将此消息转发给所有客户端(包括发送者自己),如果其他客户端接收到了此消息,则显示“【某ip】发送的消息:XXX”;如果是自己接收到了此消息,则消息“【我】发送的消息:XXX”

二 服务端

1 主程序类

package netty.socket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyNettyServerTest {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap:服务端启动时的初始化操作
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 将 bossGroup 和 workerGroup 注册到服务端的 Channel 上,并注册一个服务端的初始化器 MyNettyServerInitializer
            // 该初始化器中的 initChannel()方法,会在连接被注册后立刻执行;最后将端口号绑定到 8888
            ChannelFuture channelFuture = serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyNettyServerInitializer())
                    .bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2 自定义初始化器

package netty.chat;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        // DelimiterBasedFrameDecoder(maxFrameLength, delimiters):分隔符处理器;将接收到的客户端消息,通过回车符(Delimiters.lineDelimiter())进行分割。
        pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(2048, Delimiters.lineDelimiter()));
        pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        // 自定义处理器
        pipeline.addLast("MyNettyServerHandler", new MyNettyServerHandler());
    }
}

3 自定义处理器

package netty.chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 每当从服务端读取到客户端写入的信息时,就将该信息转发给所有的客户端 Channel(实现聊天室的效果)。
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) throws Exception {
        Channel channel = ctx.channel();
        // 遍历channelGroup,从而区分“我”和“别人”发出的消息,如果消息是自己发出的就显示“我”
        channelGroup.forEach(chnl -> { // JDK8 提供的lambda表达式
            if (channel == chnl)
                chnl.writeAndFlush("【我】发送的消息:" + receiveMsg + "\n");
            else
                chnl.writeAndFlush("【" + channel.remoteAddress() + "】发送的消息:" + receiveMsg + "\n");
        });
    }

    // 连接建立。每当从服务端收到新的客户端连接时,就将新客户端的 Channel 加入 ChannelGroup 列表中,并告知列表中的其他客户端Channel
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("客户端-" + channel.remoteAddress() + "加入\n");
        channelGroup.add(channel);
    }

    // 监听客户端上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "上线");
    }

    // 监听客户端下线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "下线");
    }

    // 连接断开。每当从服务端感知有客户端断开时,就将该客户端的 Channel 从 ChannelGroup 列表中移除,并告知列表中的其他客户端 Channel
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 会自动将 channelGroup 中断开的连接移除掉
        channelGroup.writeAndFlush("客户端-" + channel.remoteAddress() + "离开\n");
    }
}

三 客户端

1 主程序类

package netty.chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class MyNettyClientTest {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyNettyClientInitializer());
            Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            for (; ; ) { // 客户端不断的通过控制台向服务端发送消息
                channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

2 自定义初始化器

package netty.chat;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyNettyClientInitializer  extends ChannelInitializer<SocketChannel> {
    // 连接被注册后,立刻执行此方法
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        // 与服务端的 Initializer 作用相同:通过 DelimiterBasedFrameDecoder 将接收到的服务端消息,通过回车符(Delimiters.lineDelimiter())进行分割。
        pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(2048, Delimiters.lineDelimiter()));
        pipeline.addLast("StringDecoder",new StringDecoder(CharsetUtil.UTF_8)) ;
        pipeline.addLast("StringEncoder",new StringEncoder(CharsetUtil.UTF_8)) ;
        // 自定义处理器
        pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());
    }
}

3 自定义处理器

package netty.chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) {
        System.out.println(receiveMsg);
    }
}

四 测试

1 启动服务端,整个测试过程打印如下

/127.0.0.1:59967上线

/127.0.0.1:60043上线

2 启动第一个客户端,整个测试过程打印如下

客户端-/127.0.0.1:60043加入

你好,我是客户端1

【我】发送的消息:你好,我是客户端1

3 启动第二个客户端,整个测试过程打印如下

【/127.0.0.1:59967】发送的消息:你好,我是客户端1

相关文章