retrypolicy不适用于协同路由

vatpfxk5  于 2021-09-13  发布在  Java
关注(0)|答案(1)|浏览(357)

我在kotlin用协同程序制作了一个简单的grpc服务器,用java制作了一个客户端。在客户机中,我启用并配置了重试策略,但它不起作用。我花了很多时间寻找解决方案,相信我的客户机坏了,但问题出在服务器上。我会给你看代码。
这是我的原始文件:

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. option java_package = "br.com.will.protoclasses";
  4. option java_outer_classname = "NotificationProto";
  5. package notification;
  6. service Notification {
  7. rpc SendPush (SendPushNotificationRequest) returns (SendPushNotificationResponse);
  8. }
  9. message SendPushNotificationRequest {
  10. string title = 1;
  11. string message = 2;
  12. string customer_id = 3;
  13. }
  14. message SendPushNotificationResponse {
  15. string message = 1;
  16. }

这是客户:

  1. open class NotificationClient(private val channel: ManagedChannel) {
  2. private val stub: NotificationGrpcKt.NotificationCoroutineStub =
  3. NotificationGrpcKt.NotificationCoroutineStub(channel)
  4. suspend fun send() {
  5. val request =
  6. SendPushNotificationRequest.newBuilder().setCustomerId(UUID.randomUUID().toString()).setMessage("test")
  7. .setTitle("test").build()
  8. val response = stub.sendPush(request)
  9. println("Received: ${response.message}")
  10. }
  11. }
  12. suspend fun main(args: Array<String>) {
  13. val port = System.getenv("PORT")?.toInt() ?: 50051
  14. val retryPolicy: MutableMap<String, Any> = HashMap()
  15. retryPolicy["maxAttempts"] = 5.0
  16. retryPolicy["initialBackoff"] = "10s"
  17. retryPolicy["maxBackoff"] = "30s"
  18. retryPolicy["backoffMultiplier"] = 2.0
  19. retryPolicy["retryableStatusCodes"] = listOf<Any>("INTERNAL")
  20. val methodConfig: MutableMap<String, Any> = HashMap()
  21. val name: MutableMap<String, Any> = HashMap()
  22. name["service"] = "notification.Notification"
  23. name["method"] = "SendPush"
  24. methodConfig["name"] = listOf<Any>(name)
  25. methodConfig["retryPolicy"] = retryPolicy
  26. val serviceConfig: MutableMap<String, Any> = HashMap()
  27. serviceConfig["methodConfig"] = listOf<Any>(methodConfig)
  28. print(serviceConfig)
  29. val channel = ManagedChannelBuilder.forAddress("localhost", port)
  30. .usePlaintext()
  31. .defaultServiceConfig(serviceConfig)
  32. .enableRetry()
  33. .build()
  34. val client = NotificationClient(channel)
  35. client.send()
  36. }

这是我的grpc服务的一部分,我在其中测试重试策略(客户端上的重试策略不适用于此实现):

  1. override suspend fun sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse {
  2. val count: Int = retryCounter.incrementAndGet()
  3. log.info("Received a call on method sendPushNotification with payload -> $request")
  4. if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
  5. log.info("Returning stubbed INTERNAL error. count: $count")
  6. throw Status.INTERNAL.withDescription("error").asRuntimeException()
  7. }
  8. log.info("Returning successful Hello response, count: $count")
  9. return SendPushNotificationResponse.newBuilder().setMessage("success").build()
  10. }

另一个实现,但现在使用streamobserver(此实现工作正常):

  1. override fun sendPush(
  2. request: SendPushNotificationRequest?,
  3. responseObserver: StreamObserver<SendPushNotificationResponse>?
  4. ) {
  5. log.info("Received a call on method sendPushNotification with payload -> $request")
  6. val count: Int = retryCounter.incrementAndGet()
  7. if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
  8. log.info("Returning stubbed UNAVAILABLE error. count: $count")
  9. responseObserver!!.onError(
  10. Status.UNAVAILABLE.withDescription("error").asRuntimeException()
  11. )
  12. } else {
  13. log.info("Returning successful Hello response, count: $count")
  14. responseObserver!!.onNext(SendPushNotificationResponse.newBuilder().setMessage("success").build())
  15. return responseObserver.onCompleted()
  16. }
  17. }

问题是,怎么了?有人能帮我吗?

xzlaal3s

xzlaal3s1#

该代码是否由grpc生成:

  1. sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse

grpc依赖于 StreamObserver 在呼叫后向客户端发送响应 responseObserver.onCompleted()responseObserver.onError ,请确保您的代码可以正常工作。

相关问题