Netty学习(七)-Netty编解码技术以及ProtoBuf和Thrift的介绍

x33g5p2x  于2021-12-21 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(657)

在前几节我们学习过处理粘包和拆包的问题,用到了Netty提供的几个解码器对不同情况的问题进行处理。功能很是强大。我们有没有去想这么强大的功能是如何实现的呢?背后又用到了什么技术?这一节我们就来处理这个问题。了解一下编码解码到底是如何处理的。

通常说的编码(Encoder)也就是发生在发送消息的时候需要将消息编译成字节对象,在Netty中即编译成ByteBuf对象。在java中我们将这种编译称之为序列化(Serializable),即将对象序列化为字节数组,然后用于传输或是持久化啊之类的。那么自然解码(Decoder)就是一个反序列化的过程,使用相应的编码格式对接收到的对做一个解码,以正确解析该对象。

1. java序列化的弱点

谈到序列化我们自然想到java提供的Serializable接口,在java中我们如果需要序列化只需要继承该接口就可以通过输入输出流进行序列化和反序列化。但是在提供很用户简单的调用的同时他也存在很多问题:

  • 无法跨语言。当我们进行跨应用之间的服务调用的时候如果另外一个应用使用c语言来开发,这个时候我们发送过去的序列化对象,别人是无法进行反序列化的因为其内部实现对于别人来说完全就是黑盒。
  • 序列化之后的码流太大。这个我们可以做一个实验还是上一节中的Message类,我们分别用java的序列化和使用二进制编码来做一个对比,下面我写了一个测试类:
  1. @Test
  2. public void testSerializable(){
  3. String str = "哈哈,我是一条消息";
  4. Message msg = new Message((byte)0xAD,35,str);
  5. ByteArrayOutputStream out = new ByteArrayOutputStream();
  6. try {
  7. ObjectOutputStream os = new ObjectOutputStream(out);
  8. os.writeObject(msg);
  9. os.flush();
  10. byte[] b = out.toByteArray();
  11. System.out.println("jdk序列化后的长度: "+b.length);
  12. os.close();
  13. out.close();
  14. ByteBuffer buffer = ByteBuffer.allocate(1024);
  15. byte[] bt = msg.getMsgBody().getBytes();
  16. buffer.put(msg.getType());
  17. buffer.putInt(msg.getLength());
  18. buffer.put(bt);
  19. buffer.flip();
  20. byte[] result = new byte[buffer.remaining()];
  21. buffer.get(result);
  22. System.out.println("使用二进制序列化的长度:"+result.length);
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }

输出结果为:

我们可以看到差距是挺大的,目前的主流编解码框架序列化之后的码流也都比java序列化要小太多。

  • 序列化效率差,这个我们也可以做一个对比,还是上面写的测试代码我们循环跑100000次对比一下时间:
  1. @Test
  2. public void testSerializable(){
  3. String str = "哈哈,我是一条消息";
  4. Message msg = new Message((byte)0xAD,35,str);
  5. ByteArrayOutputStream out = new ByteArrayOutputStream();
  6. try {
  7. long startTime = System.currentTimeMillis();
  8. for(int i = 0;i < 100000;i++){
  9. ObjectOutputStream os = new ObjectOutputStream(out);
  10. os.writeObject(msg);
  11. os.flush();
  12. byte[] b = out.toByteArray();
  13. /*System.out.println("jdk序列化后的长度: "+b.length);*/
  14. os.close();
  15. out.close();
  16. }
  17. long endTime = System.currentTimeMillis();
  18. System.out.println("jdk序列化100000次耗时:" +(endTime - startTime));
  19. long startTime1 = System.currentTimeMillis();
  20. for(int i = 0;i < 100000;i++){
  21. ByteBuffer buffer = ByteBuffer.allocate(1024);
  22. byte[] bt = msg.getMsgBody().getBytes();
  23. buffer.put(msg.getType());
  24. buffer.putInt(msg.getLength());
  25. buffer.put(bt);
  26. buffer.flip();
  27. byte[] result = new byte[buffer.remaining()];
  28. buffer.get(result);
  29. /*System.out.println("使用二进制序列化的长度:"+result.length);*/
  30. }
  31. long endTime1 = System.currentTimeMillis();
  32. System.out.println("使用二进制序列化100000次耗时:" +(endTime1 - startTime1));
  33. } catch (IOException e) {
  34. e.printStackTrace();
  35. }
  36. }

结果为:

结果为毫秒数,这个差距也是不小的。

结合以上我们看到:目前的序列化过程中使用java本身的肯定是不行,使用二进制编码的话又的我们自己去手写,所以为了让我们少搬砖前辈们早已经写好了工具让我们调用,目前社区比较活跃的有google的Protobuf和Apache的Thrift。

2. Protobuf序列化的使用

我们先来使用Protobuf进行序列化,他和XML,json一样都有自己的语法,xml的后缀是.xml,json文件的后缀是.json,自然Protobuf文件的后缀就是.proto(哈哈,当然不是全称)。

下面我们使用Protobuf来封装一段消息,通过一个案例简单介绍一下它的使用。

首先我们用Protobuf的语法格式来写一段需要序列化的对象,命名格式为:Msg.proto

  1. option java_package = "cn.edu.hust.netty.demo10";
  2. option java_outer_classname = "MessageProto";
  3. message RequestMsg{
  4. required bytes msgType = 1;
  5. required string receiveOne = 2;
  6. required string msg = 3;
  7. }
  8. message ResponseMsg{
  9. required bytes msgType = 1;
  10. required string receiveOne = 2;
  11. required string msg = 3;
  12. }

关于Message.proto中的语法格式,详情大家google一下相关的说明,网上很多介绍,再次简单就上面的语法说明一下:

  • option java_package:表示生成的.java文件的包名
  • option java_outer_classname:生成的java文件的文件名
  • message : 为他的基本类型,如同java中的class一样

字段修饰符:

  • required:一个格式良好的消息一定要含有1个这种字段。表示该值是必须要设置的;
  • optional:消息格式中该字段可以有0个或1个值(不超过1个)。
  • repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留。表示该值可以重复,相当于java中的List。

字符类型稍微有些不同:double,float,int32,int64,bool(boolean)
,string,bytes。稍微有些不同,String,boolean,int有差别。

另外我们看到上面3个字段分别赋值了,这个值是什么意思呢?消息定义中,每个字段都有唯一的一个数字标识符。这些标识符是用来在消息的二进制格式中识别各个字段的,一旦开始使用就不能够再改变。注:[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。

关于Protobuf 的语法我们就简单的介绍这么多,更多细节大家自己去查阅文档吧。下面我们开始使用Protobuf 来进行序列化。

首先我们的在工程中引入protobuf的jar包,目前官方版本最高3.2,我们用3.0的吧:

  1. <dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>3.0.2</version>
  5. </dependency>

Protobuf的文件已经定义好了,下就需要把它编译成java代码,这里我们的借助到google为我们提供的脚本工具protoc,链接在这里,点击下载这里提供的是protoc-3.0.2。要注意protoc的版本需要和Protobuf的版本对应上,不然不同的版本之间会有一些差异解析可能会有问题。现在知道我们为啥非得选用protobuf3.0.2版本吧,因为我没有找到别的版本的protoc。。。

下载好了我们解压缩然后把刚才写好的Msg.proto文件复制进去。

接着我们进cmd输入如下命令:

主要是第三句命令。如果你输入没有报错的话你的proto文件夹应该会生成一个子文件夹:

进去该文件夹你会看到已经生成了MessageProto.java文件,恭喜你,这时候你已经完成了protobuf序列化文件的生成。然后你把该文件拷贝至工程目录下。接下来我们用生成的文件去发消息吧。还是老套路服务端和客户端。

服务端:

  1. public class ProtoBufServer {
  2. private int port;
  3. public ProtoBufServer(int port) {
  4. this.port = port;
  5. }
  6. public void start(){
  7. EventLoopGroup bossGroup = new NioEventLoopGroup();
  8. EventLoopGroup workGroup = new NioEventLoopGroup();
  9. ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
  10. .channel(NioServerSocketChannel.class)
  11. .childHandler(new ServerChannelInitializer());
  12. try {
  13. ChannelFuture future = server.bind(port).sync();
  14. future.channel().closeFuture().sync();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. bossGroup.shutdownGracefully();
  19. workGroup.shutdownGracefully();
  20. }
  21. }
  22. public static void main(String[] args) {
  23. ProtoBufServer server = new ProtoBufServer(7788);
  24. server.start();
  25. }
  26. }

服务端Initializer:

  1. public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel socketChannel) throws Exception {
  4. ChannelPipeline pipeline = socketChannel.pipeline();
  5. pipeline.addLast(new ProtobufVarint32FrameDecoder());
  6. pipeline.addLast(new ProtobufDecoder(MessageProto.RequestMsg.getDefaultInstance()));
  7. pipeline.addLast(new ProtoBufServerHandler());
  8. }
  9. }

服务端handler:

  1. public class ProtoBufServerHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  4. MessageProto.ResponseMsg.Builder builder = MessageProto.ResponseMsg.newBuilder();
  5. builder.setMsgType(ByteString.copyFromUtf8("CBSP"));
  6. builder.setReceiveOne("小红");
  7. builder.setMsg("你好,你有啥事");
  8. ctx.writeAndFlush(builder.build());
  9. }
  10. @Override
  11. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  12. MessageProto.RequestMsg m = (MessageProto.RequestMsg)msg;
  13. System.out.println("Client say: "+m.getReceiveOne()+","+m.getMsg());
  14. }
  15. @Override
  16. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  17. super.exceptionCaught(ctx, cause);
  18. ctx.close();
  19. }
  20. }

客户端:

  1. public class ProtoBufClient {
  2. private int port;
  3. private String address;
  4. public ProtoBufClient(int port, String address) {
  5. this.port = port;
  6. this.address = address;
  7. }
  8. public void start(){
  9. EventLoopGroup group = new NioEventLoopGroup();
  10. Bootstrap bootstrap = new Bootstrap();
  11. bootstrap.group(group)
  12. .channel(NioSocketChannel.class)
  13. .handler(new ClientChannelInitializer());
  14. try {
  15. ChannelFuture future = bootstrap.connect(address,port).sync();
  16. future.channel().closeFuture().sync();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }finally {
  20. group.shutdownGracefully();
  21. }
  22. }
  23. public static void main(String[] args) {
  24. ProtoBufClient client = new ProtoBufClient(7788,"127.0.0.1");
  25. client.start();
  26. }
  27. }

客户端Initializer:

  1. public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
  2. protected void initChannel(SocketChannel socketChannel) throws Exception {
  3. ChannelPipeline pipeline = socketChannel.pipeline();
  4. pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
  5. pipeline.addLast(new ProtobufEncoder());
  6. pipeline.addLast(new ProtoBufClientHandler());
  7. }
  8. }
  9. 客户端handler
  10. public class ProtoBufClientHandler extends ChannelInboundHandlerAdapter {
  11. @Override
  12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  13. MessageProto.ResponseMsg m = (MessageProto.ResponseMsg)msg;
  14. System.out.println("Server say: "+m.getReceiveOne()+","+m.getMsg());
  15. }
  16. @Override
  17. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  18. MessageProto.RequestMsg.Builder builder = MessageProto.RequestMsg.newBuilder();
  19. builder.setMsgType(ByteString.copyFromUtf8("CBSP"));
  20. builder.setReceiveOne("小明");
  21. builder.setMsg("你好,我找你有事");
  22. ctx.writeAndFlush(builder.build());
  23. }
  24. @Override
  25. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  26. System.out.println("Client is close");
  27. }
  28. }

启动服务端和客户端,输出如下:

最简单的protoBuf应用案例我们就写完了,真实的使用场景大同小异,随机应变即可。

3. thrift序列化的使用

哈哈,我本来是打算讲thrift的安装和使用的,但是现在却讲不了,因为这玩意儿的安装是个问题。由于我没有linux环境,thrift如果在linux环境下安装使用是挺简单的,但是在windows环境下挺麻烦。thrift在windows下,还使用C++,搭环境是最难的。 libthrift依赖boost libthriftnb依赖boost,libevent 等于你得安装boost,libevent 除此之外,还需要openssl 装openssl,又需要perl,nasm 期间,还会涉及版本兼容问题,总而言之,比较折磨,而这还仅是安装编译。

所以暂时我就跳过这一部分,等我安装linux环境之后再来讲解吧。

相关文章