根据课程名查询学生。
syntax = "proto3";
package grpc.proto;
option java_package = "com.grpc.proto";
option java_outer_classname = "StudentData";
option java_multiple_files = true ;
// 定义接口
service StudentService {
// 请求一个 Requset 对象,响应一个 Response 对象
rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
// 请求一个 Requset 对象,响应一个 Stream 对象
rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
// 请求一个 Stream 对象,响应一个 Response 对象,本例用到的是这个接口
rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
// 请求一个 Stream,响应一个 Stream 对象
rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
}
message MyRequestId
{
int32 id = 1 ;
}
message MyResponseName
{
string name = 1 ;
}
message MyStudent
{
int32 id = 1 ;
string name = 2;
string courseName = 3 ;
}
message MyResponseStudents
{
// 服务端的响应结果是集合类型,因此需要加上 repeated
repeated MyStudent students = 1 ;
}
// 数据结构,定义请求的 Request 对象
message MyRequestCourseName
{
string courseName = 1 ;
}
// 数据结构,定义响应的 Stream
message MyResponseStudentsStream
{
int32 id = 1 ;
string name = 2;
string courseName = 3 ;
}
package grpc;
import grpc.proto.*;
import io.grpc.stub.StreamObserver;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
// 向客户端返回一个 StreamObserver 对象,服务端给客户端返回了一个 StreamObserver 对象,即响应了一个 StreamObserver 类型的 Response 对象。
@Override
public StreamObserver<MyRequestCourseName> queryStudentsByCourseName2(StreamObserver<MyResponseStudents> responseObserver) {
MyStreamObserver observer = new MyStreamObserver();
observer.setResponseObserver(responseObserver);
return observer;
}
class MyStreamObserver implements StreamObserver<MyRequestCourseName> {
private StreamObserver<MyResponseStudents> responseObserver;
private MyResponseStudents responseStudents;
public void setResponseObserver(StreamObserver<MyResponseStudents> responseObserver) {
this.responseObserver = responseObserver;
}
@Override
public void onNext(MyRequestCourseName value) {
System.out.println("接收到的请求参数是:" + value.getCourseName());
// 根据 value.getCourseName() 模拟查询操作...
MyStudent student1 = MyStudent.newBuilder().setId(1).setName("zs").setCourseName("java").build();
MyStudent student2 = MyStudent.newBuilder().setId(2).setName("ls").setCourseName("java").build();
// 将查询结果放入 responseStudents 中
this.responseStudents = MyResponseStudents.newBuilder().addStudents(student1).addStudents(student2).build();
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
// 将查询结果放入 responseStudents 中
responseObserver.onNext(responseStudents);
responseObserver.onCompleted();
}
}
}
package grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class MyGRPCServer {
private Server server;
// 启动服务
private void start() throws IOException {
int port = 8888;
server = ServerBuilder.forPort(port)
.addService(new StudentServiceImpl())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread(() ->{
System.err.println(Thread.currentThread().getName() + ",关闭JVM");
// 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
MyGRPCServer.this.stop();
}
));
}
// 关闭服务
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
// 等待服务结束
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final MyGRPCServer server = new MyGRPCServer();
server.start();
server.blockUntilShutdown();
}
}
package grpc;
import grpc.proto.MyRequestCourseName;
import grpc.proto.MyResponseStudents;
import grpc.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
public class MyGRPCClient {
public static void main(String[] args) throws Exception {
// 创建一个客户端
ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
.usePlaintext().build();
// 在 grpc 中,如果是以 Stream 方式发出请求,则此请求是异步的。因此,不能再使用阻塞式 stub 对象。
StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc
.newStub(client);
// 请求 stream,响应 response 对象
// 接收服务端返回的 StreamObserver 类型的响应结果
StreamObserver<MyResponseStudents> students = new StreamObserver<MyResponseStudents>() {
@Override
public void onNext(MyResponseStudents value) {
value.getStudentsList().forEach((student) -> {
System.out.println(student.getId() + "\t" + student.getName() + "\t" + student.getCourseName());
});
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("查询结束");
}
};
// 准备一个 StreamObserver 流,用于向服务端发送请求
StreamObserver<MyRequestCourseName> myRequestObserver = stub.queryStudentsByCourseName2(students);
myRequestObserver.onNext(MyRequestCourseName.newBuilder().setCourseName("java").build());
// 如果是向服务端发出多个 Stream 请求,则可以写多个 onNext(),如下
// myRequestObserver.onNext( MyRequestCourseName.newBuilder().setCourseName("python").build());
myRequestObserver.onCompleted();
// 因为请求是异步的,所以客户端在发出请求后不会立刻得到响应结果。本程序通过休眠来模拟等待服务端的执行过程。
Thread.sleep(3000);
client.shutdown();
}
}
接收到的请求参数是:java
1 zs java
2 ls java
查询结束
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/chengqiuming/article/details/125133812
内容来源于网络,如有侵权,请联系作者删除!