我使用客户端流模式。客户端可以在没有侧车的情况下连接服务器,但在侧车注入时失败。我不明白为什么会发生这个问题。
................................. ................................. ................................. ................................. ................................. .................................
这是我的密码:
流媒体.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.streaming";
option java_outer_classname = "StreamingProto";
option objc_class_prefix = "HLW";
package Streaming;
import public "google/protobuf/empty.proto";
service StreamingServer {
rpc GoHello (stream HelloRequest) returns (google.protobuf.Empty) {}
}
message HelloRequest {
string name = 1;
}
message HelloReplay {
string message = 1;
}
streamingclient.java文件
import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.streaming.HelloRequest;
import io.grpc.examples.streaming.StreamingServerGrpc;
import io.grpc.netty.shaded.io.netty.util.internal.StringUtil;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamingClient {
private AtomicInteger counter = new AtomicInteger(0);
private final StreamingServerGrpc.StreamingServerStub stub;
public StreamingClient(Channel channel) {
stub = StreamingServerGrpc.newStub(channel);
}
public static void main(String[] args) throws InterruptedException {
String port = System.getenv("GLAHA_GRPC_PORT");
String serviceName = System.getenv("GLAHA_GRPC_SERVICE");
serviceName = StringUtil.isNullOrEmpty(serviceName) ? "localhost" : serviceName;
port = StringUtil.isNullOrEmpty(port) ? "50051" : port;
// String target = "localhost:50051";
System.out.println(serviceName + port);
ManagedChannel channel =
ManagedChannelBuilder.forAddress(serviceName , Integer.parseInt(port)).usePlaintext().keepAliveWithoutCalls(true).build();
try {
StreamingClient helloWorldClient = new StreamingClient(channel);
helloWorldClient.go();
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
private void go() throws InterruptedException {
HelloRequest helloRequest = HelloRequest.newBuilder().setName("goHello").build();
StreamObserver<Empty> responseObserver =
new StreamObserver<Empty>() {
@Override
public void onNext(Empty value) {
System.out.println("value.getMessage()");
}
@Override
public void onError(Throwable t) {
System.out.println("error gohello: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("stream completed");
}
};
StreamObserver<HelloRequest> observer = stub.goHello(responseObserver);
TimeUnit.MILLISECONDS.sleep(5000);
while (true) {
System.out.println("client sent counter:" + counter.incrementAndGet());
observer.onNext(helloRequest);
TimeUnit.MILLISECONDS.sleep(200);
}
//observer.onCompleted();
}
}
streamserver.java文件
import com.google.protobuf.Empty;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.streaming.HelloRequest;
import io.grpc.examples.streaming.StreamingServerGrpc;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamingServer {
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new StreamingServerImpl()).build().start();
System.out.println("server up....");
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("***shutting down gRPC server since JVM is shutting down");
try {
StreamingServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("***server shut down");
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
StreamingServer server = new StreamingServer();
server.start();
server.blockUntilShutdown();
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
static class StreamingServerImpl extends StreamingServerGrpc.StreamingServerImplBase {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public StreamObserver<HelloRequest> goHello(StreamObserver<Empty> responseObserver) {
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest value) {
System.out.println("streaming receive counter:" + counter.incrementAndGet());
}
@Override
public void onError(Throwable t) {
System.out.println("responseObserver error:" + t.getMessage());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
k8s和istio yamls
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: stream-server-deployment
labels:
app: stream-server
spec:
replicas: 2
selector:
matchLabels:
app: stream-server
template:
metadata:
annotations:
sidecar.istio.io/inject: "true"
labels:
app: stream-server
spec:
containers:
- name: stream-server
image: stream-server-local:latest
imagePullPolicy: Never
ports:
- containerPort: 50051
---
apiVersion: v1
kind: Service
metadata:
name: stream-server-service
spec:
selector:
app: stream-server
ports:
- protocol: TCP
port: 50051
targetPort: 50051
name: grpc
---
apiVersion: v1
kind: ConfigMap
metadata:
name: stream-client-env-configmap
data:
GLAHA_GRPC_SERVICE: "stream-server-service"
GLAHA_GRPC_PORT: "50051"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: stream-client-deployment
labels:
app: stream-client
spec:
replicas: 1
selector:
matchLabels:
app: stream-client
template:
metadata:
annotations:
sidecar.istio.io/inject: "true"
labels:
app: stream-client
spec:
containers:
- name: stream-client
image: stream-client-local:latest
imagePullPolicy: Never
envFrom:
- configMapRef:
name: stream-client-env-configmap
---
暂无答案!
目前还没有任何答案,快来回答吧!