Akka HTTP错误响应实体在1秒后未订阅

nuypyhwy  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(174)

我正在使用Akka HTTP cachedHostConnectionPoolHttps池作为Akka流流的一部分发送请求:

private val requestFlow: Flow[(HttpRequest, HelperClass), Either[Error, String], _] =
Http().cachedHostConnectionPoolHttps(BaseUrl).mapAsync(1) {
  case (Success(HttpResponse(_, _, entity, _)), _) =>
    Unmarshal(entity).to[String].map(response => {
      Right(response)
    })
  case (Failure(ex), _) =>
    Future(Left(Error(ex)))
}

由于某种原因,并非所有请求响应都在处理中。某些响应会导致错误:

a.h.i.e.c.PoolGateway - [0 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.

如何订阅我的回复,同时保持上述流程?

rslzwgfq

rslzwgfq1#

虽然这不是最佳解决方案,但您可以按如下方式增加响应订阅超时:

akka.http.host-connection-pool.response-entity-subscription-timeout = 10.seconds

下面是一个更彻底的讨论:https://github.com/akka/akka-http/issues/1836

e3bfsja2

e3bfsja22#

根据文件建议,执行实体处理方式如下:

private val requestFlow: Flow[(HttpRequest, HelperClass), Either[Error, String], _] =
Http().cachedHostConnectionPoolHttps(BaseUrl).mapAsync(1) {
  case (Success(HttpResponse(_, _, entity, _)),    _) =>
      entity.dataBytes
        .runReduce(_ ++ _)
        .map(r => Right(r.toString))
  case (Failure(ex), _) =>
    Future(Left(Error(ex)))
}

相关问题