使用 Netty + Protobuf 开发自定义的 RPC 功能

x33g5p2x  于2022-06-06 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(271)

一 点睛

RPC 是指一个节点通过网络请求另一个节点提供的服务,这里所说的“提供的服务”可以具体化为“提供的方法”或“提供的属性”。

本篇实现一个最简单的 RPC 功能:调用的双方都是采用 Java 语言编写,并且调用的服务是远程提供的“属性”。

二 实战

1 将 message 以规定的语法结构编写到 Student.proto 文件中。

netty\protobuf\Student.proto

syntax = "proto2" ;

package netty.protobuf ;

option optimize_for = SPEED ;

option java_package = "netty.protobuf" ;

option java_outer_classname = "StudentMessage" ;

message Student

{

required string name = 1 ;

optional int32 age = 2 ;

}

2 根据  Student.proto 生成 Java 存储数据的数据结构,即 Java 类

E:\JVMDemo\demo2022>protoc --java_out=src/main/java src/main/java/netty/protobuf/Student.proto

该命令会根据 Student.proto 定义好的规则,在 src/main/java 目录下生成包 "netty.protobuf",并在该包中生成 StudentMessage.java

3 客户端主程序类

  1. package netty.protobuf;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. public class MyNettyClientTest {
  8. public static void main(String[] args) {
  9. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  10. try {
  11. Bootstrap bootstrap = new Bootstrap();
  12. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyNettyClientInitializer());
  13. Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. } finally {
  17. eventLoopGroup.shutdownGracefully();
  18. }
  19. }
  20. }

4 客户端初始化器

  1. package netty.protobuf;
  2. import io.netty.channel.ChannelInitializer;
  3. import io.netty.channel.ChannelPipeline;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.protobuf.ProtobufEncoder;
  6. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
  7. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  8. public class MyNettyClientInitializer extends ChannelInitializer<SocketChannel> {
  9. // 连接被注册后,立刻执行此方法
  10. protected void initChannel(SocketChannel sc) throws Exception {
  11. ChannelPipeline pipeline = sc.pipeline();
  12. /*思考:如何传递任何类型的数据呢?不要固定成PersonData.Person
  13. 1.使用netty自定义协议:前几位编码,如果是a 则解码成....如果是b,则解码成...
  14. b.使用protobuf解决:
  15. */
  16. // pipeline.addLast("ProtobufDecoder",new ProtobufDecoder(PersonMessage.Person.getDefaultInstance()));//解码:字节->对象
  17. // PProtobufVarint32FrameDecoder 和 rotobufVarint32LengthFieldPrepender 用于解决半包和粘包问题,这里仅做了解
  18. pipeline.addLast("ProtobufVarint32FrameDecoder",new ProtobufVarint32FrameDecoder()) ;
  19. pipeline.addLast("ProtobufVarint32LengthFieldPrepender",new ProtobufVarint32LengthFieldPrepender());
  20. // 用于将 StudentMessage 类转为字节码
  21. pipeline.addLast("ProtobufEncoder",new ProtobufEncoder());
  22. // 构建 Student 对象,并发送给服务端
  23. pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());
  24. }
  25. }

5 客户端处理器

  1. package netty.protobuf;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {
  5. @Override
  6. protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) {
  7. }
  8. @Override
  9. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  10. StudentMessage.Student Student = StudentMessage.Student.newBuilder().setName("zs").setAge(23).build();
  11. // 发送给服务单
  12. ctx.channel().writeAndFlush(Student);
  13. }
  14. }

6 服务端主程序类

  1. package netty.protobuf;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. public class MyNettyServerTest {
  8. public static void main(String[] args) {
  9. EventLoopGroup bossGroup = new NioEventLoopGroup();
  10. EventLoopGroup workerGroup = new NioEventLoopGroup();
  11. try {
  12. // ServerBootstrap:服务端启动时的初始化操作
  13. ServerBootstrap serverBootstrap = new ServerBootstrap();
  14. // 将 bossGroup 和 workerGroup 注册到服务端的 Channel 上,并注册一个服务端的初始化器 NettyServerInitializer(该初始化器中的 initChannel() 方法,会在连接被注册后立刻执行);最后将端口号绑定到 8888
  15. ChannelFuture channelFuture = serverBootstrap
  16. .group(bossGroup, workerGroup)
  17. .channel(NioServerSocketChannel.class)
  18. .childHandler(new MyNettyServerInitializer())
  19. .bind(8888).sync();
  20. channelFuture.channel().closeFuture().sync();
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. } finally {
  24. bossGroup.shutdownGracefully();
  25. workerGroup.shutdownGracefully();
  26. }
  27. }
  28. }

7 服务端初始化类

  1. package netty.protobuf;
  2. import io.netty.channel.ChannelInitializer;
  3. import io.netty.channel.ChannelPipeline;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.protobuf.ProtobufDecoder;
  6. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
  7. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  8. public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
  9. protected void initChannel(SocketChannel sc) throws Exception {
  10. ChannelPipeline pipeline = sc.pipeline();
  11. pipeline.addLast("ProtobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
  12. // 用于将 byte[] 解码成 StudentMessage 对象
  13. pipeline.addLast("ProtobufDecoder", new ProtobufDecoder(StudentMessage.Student.getDefaultInstance()));
  14. pipeline.addLast("ProtobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
  15. // 打印 StudentMessage 对象
  16. pipeline.addLast("MyNettyServerHandler", new MyNettyServerHandler());
  17. }
  18. }

8 服务端处理器

  1. package netty.protobuf;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. public class MyNettyServerHandler extends SimpleChannelInboundHandler<StudentMessage.Student> {
  5. @Override
  6. protected void channelRead0(ChannelHandlerContext ctx, StudentMessage.Student receiveMsg) throws Exception {
  7. System.out.println(receiveMsg.getName() + "--" + receiveMsg.getAge());
  8. }
  9. }

三 测试

先启动服务端,再通过客户端向服务端发送数据,服务端运行结果如下
zs--23

相关文章