Netty学习(五)-DelimiterBasedFrameDecoder

x33g5p2x  于2021-12-21 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(481)

上一节我们说了LineBasedframeDecoder来解决粘包拆包的问题,TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,一般采用如下4种方式:

  1. 消息长度固定,累计读取到消息长度总和为定长Len的报文之后即认为是读取到了一个完整的消息。计数器归位,重新读取。
  2. 将回车换行符作为消息结束符。
  3. 将特殊的分隔符作为消息分隔符,回车换行符是他的一种。
  4. 通过在消息头定义长度字段来标识消息总长度。

LineBasedframeDecoder属于第二种,今天我们要说的DelimiterBasedFrameDecoder和FixedLengthFrameDecoder属于第三种和第一种。DelimiterBasedFrameDecoder用来解决以特殊符号作为消息结束符的粘包问题,FixedLengthFrameDecoder用来解决定长消息的粘包问题。下面首先来用DelimiterBasedFrameDecoder来写一个例子,我们看一下效果然后接着分析用法。

1. DelimiterBasedFrameDecoder使用

服务端:

  1. public class HelloWordServer {
  2. private int port;
  3. public HelloWordServer(int port) {
  4. this.port = port;
  5. }
  6. public void start(){
  7. EventLoopGroup bossGroup = new NioEventLoopGroup();
  8. EventLoopGroup workGroup = new NioEventLoopGroup();
  9. ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
  10. .channel(NioServerSocketChannel.class)
  11. .childHandler(new ServerChannelInitializer());
  12. try {
  13. ChannelFuture future = server.bind(port).sync();
  14. future.channel().closeFuture().sync();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. bossGroup.shutdownGracefully();
  19. workGroup.shutdownGracefully();
  20. }
  21. }
  22. public static void main(String[] args) {
  23. HelloWordServer server = new HelloWordServer(7788);
  24. server.start();
  25. }
  26. }

服务端ServerChannelInitializer:

  1. public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel socketChannel) throws Exception {
  4. ChannelPipeline pipeline = socketChannel.pipeline();
  5. ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
  6. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048,delimiter));
  7. // 字符串解码 和 编码
  8. pipeline.addLast("decoder", new StringDecoder());
  9. pipeline.addLast("encoder", new StringEncoder());
  10. // 自己的逻辑Handler
  11. pipeline.addLast("handler", new ServerHandler());
  12. }
  13. }

服务端handler:

  1. public class ServerHandler extends ChannelInboundHandlerAdapter {
  2. private int counter;
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. String body = (String)msg;
  6. System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
  7. }
  8. @Override
  9. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  10. super.exceptionCaught(ctx, cause);
  11. }
  12. }

客户端:

  1. public class HelloWorldClient {
  2. private int port;
  3. private String address;
  4. public HelloWorldClient(int port,String address) {
  5. this.port = port;
  6. this.address = address;
  7. }
  8. public void start(){
  9. EventLoopGroup group = new NioEventLoopGroup();
  10. Bootstrap bootstrap = new Bootstrap();
  11. bootstrap.group(group)
  12. .channel(NioSocketChannel.class)
  13. .handler(new ClientChannelInitializer());
  14. try {
  15. ChannelFuture future = bootstrap.connect(address,port).sync();
  16. future.channel().closeFuture().sync();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }finally {
  20. group.shutdownGracefully();
  21. }
  22. }
  23. public static void main(String[] args) {
  24. HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1");
  25. client.start();
  26. }
  27. }

客户端ClientChannelInitializer:

  1. public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
  2. protected void initChannel(SocketChannel socketChannel) throws Exception {
  3. ChannelPipeline pipeline = socketChannel.pipeline();
  4. /* * 这个地方的 必须和服务端对应上。否则无法正常解码和编码 * * */
  5. ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
  6. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048,delimiter));
  7. pipeline.addLast("decoder", new StringDecoder());
  8. pipeline.addLast("encoder", new StringEncoder());
  9. // 客户端的逻辑
  10. pipeline.addLast("handler", new ClientHandler());
  11. }
  12. }

客户端handler:

  1. public class ClientHandler extends ChannelInboundHandlerAdapter {
  2. private byte[] req;
  3. private int counter;
  4. public ClientHandler() {
  5. req = ("Unless required by applicable law or agreed to in writing, software\t" +
  6. " distributed under the License is distributed on an \"AS IS\" BASIS,\t" +
  7. " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\t" +
  8. " See the License for the specific language governing permissions and\t" +
  9. " limitations under the License.This connector uses the BIO implementation that requires the JSSE\t" +
  10. " style configuration. When using the APR/native implementation, the\t" +
  11. " penSSL style configuration is required as described in the APR/native\t" +
  12. " documentation.An Engine represents the entry point (within Catalina) that processes\t" +
  13. " every request. The Engine implementation for Tomcat stand alone\t" +
  14. " analyzes the HTTP headers included with the request, and passes them\t" +
  15. " on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software\t" +
  16. "# distributed under the License is distributed on an \"AS IS\" BASIS,\t" +
  17. "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\t" +
  18. "# See the License for the specific language governing permissions and\t" +
  19. "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log\t" +
  20. "# each component that extends LifecycleBase changing state:\t" +
  21. "#org.apache.catalina.util.LifecycleBase.level = FINE\t"
  22. ).getBytes();
  23. }
  24. @Override
  25. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  26. ByteBuf message;
  27. message = Unpooled.buffer(req.length);
  28. message.writeBytes(req);
  29. ctx.writeAndFlush(message);
  30. }
  31. @Override
  32. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  33. String buf = (String)msg;
  34. System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
  35. }
  36. @Override
  37. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  38. ctx.close();
  39. }
  40. }

输出如下:

  1. server receive order : Unless required by applicable law or agreed to in writing, software;the counter is: 1
  2. server receive order : distributed under the License is distributed on an "AS IS" BASIS,;the counter is: 2
  3. server receive order : WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.;the counter is: 3
  4. server receive order : See the License for the specific language governing permissions and;the counter is: 4
  5. server receive order : limitations under the License.This connector uses the BIO implementation that requires the JSSE;the counter is: 5
  6. server receive order : style configuration. When using the APR/native implementation, the;the counter is: 6
  7. server receive order : penSSL style configuration is required as described in the APR/native;the counter is: 7
  8. server receive order : documentation.An Engine represents the entry point (within Catalina) that processes;the counter is: 8
  9. server receive order : every request. The Engine implementation for Tomcat stand alone;the counter is: 9
  10. server receive order : analyzes the HTTP headers included with the request, and passes them;the counter is: 10
  11. server receive order : on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software;the counter is: 11
  12. server receive order : # distributed under the License is distributed on an "AS IS" BASIS,;the counter is: 12
  13. server receive order : # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.;the counter is: 13
  14. server receive order : # See the License for the specific language governing permissions and;the counter is: 14
  15. server receive order : # limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log;the counter is: 15
  16. server receive order : # each component that extends LifecycleBase changing state:;the counter is: 16
  17. server receive order : #org.apache.catalina.util.LifecycleBase.level = FINE;the counter is: 17

启动服务端和客户端,我们能看到服务端接收客户端发过来的消息一共分17次接收。那么为什么是17次呢?而且我们并没有使用在上一篇中解决拆包和粘包问题的LineBasedFrameDecoder,并且这次我们的消息每一行的末尾也换成了”\t”。下面就来讲解一下DelimiterBasedFrameDecoder的使用。

DelimiterBasedFrameDecoder是将特殊的字符作为消息的分隔符,本例中用到的是”\t”。而LineBasedFrameDecoder是默认将换行符”\n”作为消息分隔符。首先我们注意到在ServerChannelInitializer中我们在添加解码器时跟以前有点不一样:

  1. ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
  2. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048, delimiter));

这里我们添加DelimiterBasedFrameDecoder解码器并且手动指定消息分隔符为:”\t”。我们可以看一下DelimiterBasedFrameDecoder的构造方法:

  1. public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
  2. this(maxFrameLength, stripDelimiter, true, delimiter);
  3. }

maxFrameLength:解码的帧的最大长度

stripDelimiter:解码时是否去掉分隔符

failFast:为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常

delimiter:分隔符

这个时候大家应该明白了为什么服务端分17次收到消息。我们在消息的每一行都加了一个”\t”,自然解码器在度消息时遇到”\t”就会认为这是一条消息的结束。用这种方式我们可以把”\t”换成任何我们自定义的字符对象。换成”\n”也是可以的。

2. FixedLengthFrameDecoder使用

FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码。使用它也没有什么特别费力的事情,在ServerChannelInitializer类中添加:

pipeline.addLast(new FixedLengthFrameDecoder(23));//参数为一次接受的数据长度

即可,同时也别忘了把刚才使用的DelimiterBasedFrameDecoder注释掉啊,不然达不到效果。

相关文章