高性能IO框架Netty四 - 解决粘包/半包问题

x33g5p2x  于2021-12-21 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(490)

前言:demo演示

首先,我们来看个demo

1、EchoServer

  1. /**
  2. * 作者:DarkKing
  3. * 类说明:
  4. */
  5. public class EchoServer {
  6. private final int port;
  7. public EchoServer(int port) {
  8. this.port = port;
  9. }
  10. public static void main(String[] args) throws InterruptedException {
  11. EchoServer echoServer = new EchoServer(9999);
  12. System.out.println("服务器即将启动");
  13. echoServer.start();
  14. System.out.println("服务器关闭");
  15. }
  16. public void start() throws InterruptedException {
  17. final EchoServerHandler serverHandler = new EchoServerHandler();
  18. EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
  19. try {
  20. ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
  21. b.group(group)/*将线程组传入*/
  22. .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
  23. .localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/
  24. /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
  25. 所以下面这段代码的作用就是为这个子channel增加handle*/
  26. .childHandler(new ChannelInitializer<SocketChannel>() {
  27. protected void initChannel(SocketChannel ch) throws Exception {
  28. ch.pipeline().addLast(serverHandler);/*添加到该子channel的pipeline的尾部*/
  29. }
  30. });
  31. ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
  32. System.out.println("服务器启动完成,等待客户端的连接和数据.....");
  33. f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
  34. } finally {
  35. group.shutdownGracefully().sync();/*优雅关闭线程组*/
  36. }
  37. }
  38. }

2、EchoServerHandler

  1. /**
  2. * 作者:DarkKing
  3. * 类说明:自己的业务处理
  4. */
  5. @ChannelHandler.Sharable
  6. public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  7. private AtomicInteger counter = new AtomicInteger(0);
  8. /*** 服务端读取到网络数据后的处理*/
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  11. ByteBuf in = (ByteBuf) msg;
  12. String request = in.toString(CharsetUtil.UTF_8);
  13. System.out.println("Server Accept[" + request
  14. + "] and the counter is:" + counter.incrementAndGet());
  15. String resp = "Hello," + request + ". Welcome to Netty World!"
  16. + System.getProperty("line.separator");
  17. ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
  18. }
  19. /*** 发生异常后的处理*/
  20. @Override
  21. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  22. cause.printStackTrace();
  23. ctx.close();
  24. }
  25. }

使用netty实现了个服务端,当接收到客户端的消息是,打印出来请求的内容,并统计接收请求的次数。

3、EchoClient

  1. /**
  2. * 作者:DarkKing
  3. * 类说明:
  4. */
  5. public class EchoClient {
  6. private final int port;
  7. private final String host;
  8. public EchoClient(int port, String host) {
  9. this.port = port;
  10. this.host = host;
  11. }
  12. public void start() throws InterruptedException {
  13. EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
  14. try {
  15. final Bootstrap b = new Bootstrap();
  16. /*客户端启动必须*/
  17. b.group(group)/*将线程组传入*/
  18. .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
  19. .remoteAddress(new InetSocketAddress(host, port))/*配置要连接服务器的ip地址和端口*/
  20. .handler(new ChannelInitializer<SocketChannel>() {
  21. protected void initChannel(SocketChannel ch) throws Exception {
  22. ch.pipeline().addLast(new EchoClientHandler());
  23. }
  24. });
  25. ChannelFuture f = b.connect().sync();
  26. f.channel().closeFuture().sync();
  27. } finally {
  28. group.shutdownGracefully().sync();
  29. }
  30. }
  31. public static void main(String[] args) throws InterruptedException {
  32. new EchoClient(9999, "127.0.0.1").start();
  33. }
  34. }

4、EchoClientHandler

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.util.CharsetUtil;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. /**
  8. * 作者:DarkKing
  9. * 类说明:
  10. */
  11. public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  12. private AtomicInteger counter = new AtomicInteger(0);
  13. /*** 客户端读取到网络数据后的处理*/
  14. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  15. System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
  16. + "] and the counter is:" + counter.incrementAndGet());
  17. }
  18. /*** 客户端被通知channel活跃后,做事*/
  19. @Override
  20. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  21. ByteBuf msg = null;
  22. String request = "test1,test2,test3,test4"
  23. + System.getProperty("line.separator");
  24. for (int i = 0; i < 100; i++) {
  25. msg = Unpooled.buffer(request.length());
  26. msg.writeBytes(request.getBytes());
  27. ctx.writeAndFlush(msg);
  28. }
  29. }
  30. /*** 发生异常后的处理*/
  31. @Override
  32. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  33. cause.printStackTrace();
  34. ctx.close();
  35. }
  36. }

使用netty实现了个客户端,链接建立完成之后向服务端发送消息。循环100次。并且打印服务端返回的消息。并统计返回次数。

执行结果

服务端输出

客户端打印

结果发现,我们客户单发送了100次数据,但实际上只接收了30次。而且每次消息发送的是test1,test2,test3,test4,test5,但实际接受的却有很多相链接起来的。这是为什么呢?为什么不是100次test1,test2,test3,test4,test5呢?这就是TCP传输的粘包/半包问题。

一、什么是TCP粘包半包?

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

二、TCP粘包/半包发生的原因

由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象

UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。

分包产生的原因就简单的多:可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。

更具体的原因有三个,分别如下。

  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  2. 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长
  3. 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

三、解决粘包半包问题

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

1、在包尾增加分割符

在包尾增加分割符,比如回车换行符进行分割,例如FTP协议;

demo如下:

**LineBaseEchoServer **

  1. /**
  2. * 作者:DarkKing
  3. * 类说明:
  4. */
  5. public class LineBaseEchoServer {
  6. public static final int PORT = 9998;
  7. public static void main(String[] args) throws InterruptedException {
  8. LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
  9. System.out.println("服务器即将启动");
  10. lineBaseEchoServer.start();
  11. }
  12. public void start() throws InterruptedException {
  13. final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
  14. EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
  15. try {
  16. ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
  17. b.group(group)/*将线程组传入*/
  18. .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
  19. .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
  20. /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
  21. 所以下面这段代码的作用就是为这个子channel增加handle*/
  22. .childHandler(new ChannelInitializerImp());
  23. ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
  24. System.out.println("服务器启动完成,等待客户端的连接和数据.....");
  25. f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
  26. } finally {
  27. group.shutdownGracefully().sync();/*优雅关闭线程组*/
  28. }
  29. }
  30. private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
  31. @Override
  32. protected void initChannel(Channel ch) throws Exception {
  33. //添加换行解码器
  34. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  35. ch.pipeline().addLast(new LineBaseServerHandler());
  36. }
  37. }
  38. }

LineBaseEchoServer

  1. /**
  2. * 作者:DarkKing
  3. * 类说明:
  4. */
  5. public class LineBaseEchoServer {
  6. public static final int PORT = 9998;
  7. public static void main(String[] args) throws InterruptedException {
  8. LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
  9. System.out.println("服务器即将启动");
  10. lineBaseEchoServer.start();
  11. }
  12. public void start() throws InterruptedException {
  13. final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
  14. EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
  15. try {
  16. ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
  17. b.group(group)/*将线程组传入*/
  18. .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
  19. .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
  20. /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
  21. 所以下面这段代码的作用就是为这个子channel增加handle*/
  22. .childHandler(new ChannelInitializerImp());
  23. ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
  24. System.out.println("服务器启动完成,等待客户端的连接和数据.....");
  25. f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
  26. } finally {
  27. group.shutdownGracefully().sync();/*优雅关闭线程组*/
  28. }
  29. }
  30. private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
  31. @Override
  32. protected void initChannel(Channel ch) throws Exception {
  33. //添加换行解码器
  34. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  35. ch.pipeline().addLast(new LineBaseServerHandler());
  36. }
  37. }
  38. }

LineBaseEchoClient

  1. /**
  2. * 作者:DarkKing
  3. */
  4. public class LineBaseEchoClient {
  5. private final String host;
  6. public LineBaseEchoClient(String host) {
  7. this.host = host;
  8. }
  9. public void start() throws InterruptedException {
  10. EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
  11. try {
  12. final Bootstrap b = new Bootstrap();
  13. b.group(group)/*将线程组传入*/
  14. .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
  15. .remoteAddress(new InetSocketAddress(host, LineBaseEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
  16. .handler(new ChannelInitializerImp());
  17. ChannelFuture f = b.connect().sync();
  18. System.out.println("已连接到服务器.....");
  19. f.channel().closeFuture().sync();
  20. } finally {
  21. group.shutdownGracefully().sync();
  22. }
  23. }
  24. private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
  25. @Override
  26. protected void initChannel(Channel ch) throws Exception {
  27. //回车符做了分割
  28. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  29. ch.pipeline().addLast(new LineBaseClientHandler());
  30. }
  31. }
  32. public static void main(String[] args) throws InterruptedException {
  33. new LineBaseEchoClient("127.0.0.1").start();
  34. }
  35. }

LineBaseClientHandler

  1. /**
  2. * 作者:DarkKing
  3. * 类说明:
  4. */
  5. public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  6. private AtomicInteger counter = new AtomicInteger(0);
  7. /*** 客户端读取到网络数据后的处理*/
  8. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  9. System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
  10. + "] and the counter is:" + counter.incrementAndGet());
  11. ctx.close();
  12. }
  13. /*** 客户端被通知channel活跃后,做事*/
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  16. ByteBuf msg = null;
  17. String request = "test1,test2,test3,test4,test5"
  18. + System.getProperty("line.separator");
  19. for (int i = 0; i < 10; i++) {
  20. Thread.sleep(500);
  21. System.out.println(System.currentTimeMillis() + ":即将发送数据:"
  22. + request);
  23. msg = Unpooled.buffer(request.length());
  24. msg.writeBytes(request.getBytes());
  25. ctx.writeAndFlush(msg);
  26. }
  27. }
  28. /*** 发生异常后的处理*/
  29. @Override
  30. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  31. cause.printStackTrace();
  32. ctx.close();
  33. }
  34. }

执行效果

2、消息定长

例如每个报文的大小为固定长度200字节,如果不够,空位补空格;

服务端只需将服务端的ChannelInitializerImp 解码器new LineBasedFrameDecoder(1024)替换为new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())即可。

  1. private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
  2. @Override
  3. protected void initChannel(Channel ch) throws Exception {
  4. //添加定长报文长度解码器,长度问请求的长度
  5. ch.pipeline().addLast(
  6. new FixedLengthFrameDecoder(
  7. FixedLengthEchoClient.REQUEST.length()));
  8. ch.pipeline().addLast(new FixedLengthServerHandler());
  9. }
  10. }

3、将消息分为消息头和消息体

消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度。类似与第二条,只是我们按照头部的content-length长度进行定长解码。

相关文章