gRPC 实战之——客户端为 Request,服务端为 Stream

x33g5p2x  于2022-06-06 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(338)

一 需求

根据课程名查询学生。

二 编写 proto 文件

  1. syntax = "proto3";
  2. package grpc.proto;
  3. option java_package = "com.grpc.proto";
  4. option java_outer_classname = "StudentData";
  5. option java_multiple_files = true ;
  6. // 定义接口
  7. service StudentService {
  8. // 请求一个 Requset 对象,响应一个 Response 对象
  9. rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
  10. // 请求一个 Requset 对象,响应一个 Stream 对象,本例用的是这个接口
  11. rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
  12. // 请求一个 Stream 对象,响应一个 Response 对象
  13. rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
  14. // 请求一个 Stream,响应一个 Stream 对象
  15. rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
  16. }
  17. // 定义请求的 Request 对象
  18. message MyRequestId
  19. {
  20. int32 id = 1 ;
  21. }
  22. // 定义响应的 Stream 对象
  23. message MyResponseName
  24. {
  25. string name = 1 ;
  26. }
  27. message MyStudent
  28. {
  29. int32 id = 1 ;
  30. string name = 2;
  31. string courseName = 3 ;
  32. }
  33. message MyResponseStudents
  34. {
  35. // 服务端的响应结果是集合类型,因此需要加上 repeated
  36. repeated MyStudent students = 1 ;
  37. }
  38. // 数据结构,定义请求的 Request 对象
  39. message MyRequestCourseName
  40. {
  41. string courseName = 1 ;
  42. }
  43. // 数据结构,定义响应的 Stream
  44. message MyResponseStudentsStream
  45. {
  46. int32 id = 1 ;
  47. string name = 2;
  48. string courseName = 3 ;
  49. }

三 编写接口实现类

  1. package grpc;
  2. import grpc.proto.*;
  3. import io.grpc.stub.StreamObserver;
  4. public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
  5. @Override
  6. public void queryStudentNameById(MyRequestId request, StreamObserver<MyResponseName> responseObserver) {
  7. System.out.println("模拟查询此id的用户名:" + request.getId());
  8. // 假设此 id 的 name 是“zs”
  9. responseObserver.onNext(MyResponseName.newBuilder().setName("zs").build());
  10. responseObserver.onCompleted();
  11. }
  12. // 通过 Stream 的方式响应客户端
  13. @Override
  14. public void queryStudentsByCourseName(MyRequestCourseName request, StreamObserver<MyResponseStudentsStream> responseObserver) {
  15. // 接收到的 courseName 是"java"
  16. String courseName = request.getCourseName();
  17. // 假设有 3 个 Student 选修了"java"课程
  18. MyResponseStudentsStream student1 = MyResponseStudentsStream.newBuilder().setId(1).setName("zs").setCourseName("java").build();
  19. MyResponseStudentsStream student2 = MyResponseStudentsStream.newBuilder().setId(2).setName("ls").setCourseName("java").build();
  20. MyResponseStudentsStream student3 = MyResponseStudentsStream.newBuilder().setId(3).setName("ww").setCourseName("java").build();
  21. // 将查询到的 3 个 Student,放入 responseObserver中
  22. responseObserver.onNext(student1);
  23. responseObserver.onNext(student2);
  24. responseObserver.onNext(student3);
  25. responseObserver.onCompleted();
  26. }
  27. }

四 编写服务端代码

  1. package grpc;
  2. import io.grpc.Server;
  3. import io.grpc.ServerBuilder;
  4. import java.io.IOException;
  5. public class MyGRPCServer {
  6. private Server server;
  7. // 启动服务
  8. private void start() throws IOException {
  9. int port = 8888;
  10. server = ServerBuilder.forPort(port)
  11. .addService(new StudentServiceImpl())
  12. .build()
  13. .start();
  14. Runtime.getRuntime().addShutdownHook(new Thread(() ->{
  15. System.err.println(Thread.currentThread().getName() + ",关闭JVM");
  16. // 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
  17. MyGRPCServer.this.stop();
  18. }
  19. ));
  20. }
  21. // 关闭服务
  22. private void stop() {
  23. if (server != null) {
  24. server.shutdown();
  25. }
  26. }
  27. private void blockUntilShutdown() throws InterruptedException {
  28. if (server != null) {
  29. // 等待服务结束
  30. server.awaitTermination();
  31. }
  32. }
  33. public static void main(String[] args) throws IOException, InterruptedException {
  34. final MyGRPCServer server = new MyGRPCServer();
  35. server.start();
  36. server.blockUntilShutdown();
  37. }
  38. }

五 编写客户端代码

  1. package grpc;
  2. import grpc.proto.*;
  3. import io.grpc.ManagedChannel;
  4. import io.grpc.ManagedChannelBuilder;
  5. import java.util.Iterator;
  6. public class MyGRPCClient {
  7. public static void main(String[] args) throws Exception {
  8. // 创建一个客户端
  9. ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
  10. .usePlaintext().build();
  11. try {
  12. // 创建客户端的代理对象,用于代表客户端去访问服务端提供的方法
  13. StudentServiceGrpc.StudentServiceBlockingStub stub = StudentServiceGrpc
  14. .newBlockingStub(client);
  15. // 请求 request,响应 Stream
  16. Iterator<MyResponseStudentsStream> students = stub.queryStudentsByCourseName(MyRequestCourseName.newBuilder().setCourseName("java").build());
  17. while (students.hasNext()) {
  18. MyResponseStudentsStream student = students.next();
  19. System.out.println(student.getId() + "\t" + student.getName() + "\t" + student.getCourseName());
  20. }
  21. } finally {
  22. client.shutdown();
  23. }
  24. }
  25. }

六 测试

1 启动服务端和客户端

2 客户端运行结果如下

1    zs    java

2    ls    java

3    ww    java

相关文章