gRPC 实战之——客户端为 Stream,服务端为 Response

x33g5p2x  于2022-06-06 转载在 其他  
字(4.9k)|赞(0)|评价(0)|浏览(535)

一 需求

根据课程名查询学生。

二 编写 proto 文件

  1. syntax = "proto3";
  2. package grpc.proto;
  3. option java_package = "com.grpc.proto";
  4. option java_outer_classname = "StudentData";
  5. option java_multiple_files = true ;
  6. // 定义接口
  7. service StudentService {
  8. // 请求一个 Requset 对象,响应一个 Response 对象
  9. rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
  10. // 请求一个 Requset 对象,响应一个 Stream 对象
  11. rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
  12. // 请求一个 Stream 对象,响应一个 Response 对象,本例用到的是这个接口
  13. rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
  14. // 请求一个 Stream,响应一个 Stream 对象
  15. rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
  16. }
  17. message MyRequestId
  18. {
  19. int32 id = 1 ;
  20. }
  21. message MyResponseName
  22. {
  23. string name = 1 ;
  24. }
  25. message MyStudent
  26. {
  27. int32 id = 1 ;
  28. string name = 2;
  29. string courseName = 3 ;
  30. }
  31. message MyResponseStudents
  32. {
  33. // 服务端的响应结果是集合类型,因此需要加上 repeated
  34. repeated MyStudent students = 1 ;
  35. }
  36. // 数据结构,定义请求的 Request 对象
  37. message MyRequestCourseName
  38. {
  39. string courseName = 1 ;
  40. }
  41. // 数据结构,定义响应的 Stream
  42. message MyResponseStudentsStream
  43. {
  44. int32 id = 1 ;
  45. string name = 2;
  46. string courseName = 3 ;
  47. }

三 编写接口实现类

  1. package grpc;
  2. import grpc.proto.*;
  3. import io.grpc.stub.StreamObserver;
  4. public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
  5. // 向客户端返回一个 StreamObserver 对象,服务端给客户端返回了一个 StreamObserver 对象,即响应了一个 StreamObserver 类型的 Response 对象。
  6. @Override
  7. public StreamObserver<MyRequestCourseName> queryStudentsByCourseName2(StreamObserver<MyResponseStudents> responseObserver) {
  8. MyStreamObserver observer = new MyStreamObserver();
  9. observer.setResponseObserver(responseObserver);
  10. return observer;
  11. }
  12. class MyStreamObserver implements StreamObserver<MyRequestCourseName> {
  13. private StreamObserver<MyResponseStudents> responseObserver;
  14. private MyResponseStudents responseStudents;
  15. public void setResponseObserver(StreamObserver<MyResponseStudents> responseObserver) {
  16. this.responseObserver = responseObserver;
  17. }
  18. @Override
  19. public void onNext(MyRequestCourseName value) {
  20. System.out.println("接收到的请求参数是:" + value.getCourseName());
  21. // 根据 value.getCourseName() 模拟查询操作...
  22. MyStudent student1 = MyStudent.newBuilder().setId(1).setName("zs").setCourseName("java").build();
  23. MyStudent student2 = MyStudent.newBuilder().setId(2).setName("ls").setCourseName("java").build();
  24. // 将查询结果放入 responseStudents 中
  25. this.responseStudents = MyResponseStudents.newBuilder().addStudents(student1).addStudents(student2).build();
  26. }
  27. @Override
  28. public void onError(Throwable t) {
  29. t.printStackTrace();
  30. }
  31. @Override
  32. public void onCompleted() {
  33. // 将查询结果放入 responseStudents 中
  34. responseObserver.onNext(responseStudents);
  35. responseObserver.onCompleted();
  36. }
  37. }
  38. }

四 编写服务端代码

  1. package grpc;
  2. import io.grpc.Server;
  3. import io.grpc.ServerBuilder;
  4. import java.io.IOException;
  5. public class MyGRPCServer {
  6. private Server server;
  7. // 启动服务
  8. private void start() throws IOException {
  9. int port = 8888;
  10. server = ServerBuilder.forPort(port)
  11. .addService(new StudentServiceImpl())
  12. .build()
  13. .start();
  14. Runtime.getRuntime().addShutdownHook(new Thread(() ->{
  15. System.err.println(Thread.currentThread().getName() + ",关闭JVM");
  16. // 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
  17. MyGRPCServer.this.stop();
  18. }
  19. ));
  20. }
  21. // 关闭服务
  22. private void stop() {
  23. if (server != null) {
  24. server.shutdown();
  25. }
  26. }
  27. private void blockUntilShutdown() throws InterruptedException {
  28. if (server != null) {
  29. // 等待服务结束
  30. server.awaitTermination();
  31. }
  32. }
  33. public static void main(String[] args) throws IOException, InterruptedException {
  34. final MyGRPCServer server = new MyGRPCServer();
  35. server.start();
  36. server.blockUntilShutdown();
  37. }
  38. }

五 编写客户端代码

  1. package grpc;
  2. import grpc.proto.MyRequestCourseName;
  3. import grpc.proto.MyResponseStudents;
  4. import grpc.proto.StudentServiceGrpc;
  5. import io.grpc.ManagedChannel;
  6. import io.grpc.ManagedChannelBuilder;
  7. import io.grpc.stub.StreamObserver;
  8. public class MyGRPCClient {
  9. public static void main(String[] args) throws Exception {
  10. // 创建一个客户端
  11. ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
  12. .usePlaintext().build();
  13. // 在 grpc 中,如果是以 Stream 方式发出请求,则此请求是异步的。因此,不能再使用阻塞式 stub 对象。
  14. StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc
  15. .newStub(client);
  16. // 请求 stream,响应 response 对象
  17. // 接收服务端返回的 StreamObserver 类型的响应结果
  18. StreamObserver<MyResponseStudents> students = new StreamObserver<MyResponseStudents>() {
  19. @Override
  20. public void onNext(MyResponseStudents value) {
  21. value.getStudentsList().forEach((student) -> {
  22. System.out.println(student.getId() + "\t" + student.getName() + "\t" + student.getCourseName());
  23. });
  24. }
  25. @Override
  26. public void onError(Throwable t) {
  27. t.printStackTrace();
  28. }
  29. @Override
  30. public void onCompleted() {
  31. System.out.println("查询结束");
  32. }
  33. };
  34. // 准备一个 StreamObserver 流,用于向服务端发送请求
  35. StreamObserver<MyRequestCourseName> myRequestObserver = stub.queryStudentsByCourseName2(students);
  36. myRequestObserver.onNext(MyRequestCourseName.newBuilder().setCourseName("java").build());
  37. // 如果是向服务端发出多个 Stream 请求,则可以写多个 onNext(),如下
  38. // myRequestObserver.onNext( MyRequestCourseName.newBuilder().setCourseName("python").build());
  39. myRequestObserver.onCompleted();
  40. // 因为请求是异步的,所以客户端在发出请求后不会立刻得到响应结果。本程序通过休眠来模拟等待服务端的执行过程。
  41. Thread.sleep(3000);
  42. client.shutdown();
  43. }
  44. }

六 测试

1 启动服务端

2 启动客户端

3 服务端打印

接收到的请求参数是:java

4 客户端打印

1    zs    java

2    ls    java

查询结束

相关文章