akka 如何在参与者的同一个线程中获取http.singleRequest(httpRequest)的Http响应?

xsuvu9jc  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(140)

我有一个使用case BidRequest消息中的httpRequest => http.singleRequest(httpRequest).pipeTo(self)发送HTTPPOST请求的参与者。该参与者在另一个case HttpResponse消息中接收回httpResponse。在第二个case HttpResponse消息中,我想更改第一个case BidRequest消息将发送回的变量。由于消息是异步处理的,因此当我编辑第二个消息中的变量时,第一消息已经发回具有旧状态的变量。
我想我需要以某种方式使用akka.pattern.ask,以防止消息到达另一个case HttpResponse,但仍保留在同一个case BidRequest中,以便可以在适当的位置编辑变量。

object AuctionClientActor {
  def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  var bidOffer: BidOffer = BidOffer("", 0, "")

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      val content = bidRequest.bid.toJson.toString

      val latch = new CountDownLatch(bidders.size)

      val listResponseFuture: List[Future[HttpResponse]] = bidders
        .map(bidder =>
          HttpRequest( // create the request
            HttpMethods.POST,
            uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
            entity = HttpEntity(ContentTypes.`application/json`, content)
          )
        )
        // IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
        .map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request

      listResponseFuture.foreach { response =>
        Await.result(response, 3 seconds)
        response.onComplete {
          case Success(value) => latch.countDown // println(s"response success: $value")
          case Failure(exception) =>
            println(s"response failure: $exception")
            latch.countDown
        }
      }
      latch.await(3, TimeUnit.SECONDS)
      println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
      sender() ! Some(bidOffer.content)
      bidOffer = BidOffer("", 0, "")
    case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
      log.info(s"received HttpResponse OK(200): $resp")
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        println("Got response, body: " + body.utf8String)
        val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
        // I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
        if (bidOffer.bid == 0) {
          println("new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else if (newBidOffer.bid > bidOffer.bid) {
          println("replace new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else {
          println("none")
        }
      }
    case resp@HttpResponse(code, _, _, _) =>
      log.info(s"Request failed, response code: $code")
      resp.discardEntityBytes()
  }
}

我在看这个answer,把List[Future]转换成Future[List],但是当我这样做的时候,我创建了一个Future[List[Any]],而不是一个HttpResponse

**下一段代码:**所以我试着按照你说的方法做,但是我创建了一个List[Future[Future[String]]]。如果我只有一个主机来处理请求,这很容易。但是因为我可以有1个、2个或3个请求,所以我创建了一个列表,代码变得很复杂。加上akka-streamrunFold创建了另一个Future。你能给予一个提示,如何按照你说的方法实现它吗?

val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
            .map { httpResponse =>
              println(s"response: $httpResponse")
              // this creates the second Future
              httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
                println("Got response, body: " + body.utf8String)
                // BidOfferConverter.getBidOffer(body.utf8String)
                body.utf8String
              }
            }
        }
zd287kbt

zd287kbt1#

简短的回答是,你不能,除非在receive中阻塞,这是一个主要的禁忌。
这就像是一个X:Y问题。这里的实际目标是什么?是不是你不希望在所有请求都完成之前发送响应?
如果这就是你想要的,那么你可以将map a future转换成一条消息,其中包含了你构建回应所需的信息。这样做的话,你甚至不需要bidOffer变量。
Future.sequence会将Seq[Future[A]](以及其他集合类型)折叠为Future[Seq[A]](如果任何future失败,则失败:在这种情况下,Future伴随对象中的其他组合子可能更符合您的要求)。

5vf7fwbs

5vf7fwbs2#

我必须让它运行。我也使用TryOptiongetOrElse以防服务器停机。所以我仍然发送一个HttpResponse回来。我会把答案留在这里只是为了完整性。如果有人有更好的方法来做,我很乐意重新考虑它。

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      log.info(s"received bid request: $bidRequest")
      val content = bidRequest.bid.toJson.toString
        .replace("[[", "{")
        .replace("]]", "}")
        .replace("\",\"", "\": \"")
        .replace("[", "")
        .replace("]", "")

      val responseListFuture = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          val httpResponseFuture = http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future[HttpResponse]
          Await.ready(httpResponseFuture, 5 seconds)
          httpResponseFuture.value.get.getOrElse(HttpResponse(StatusCodes.NotFound))
        }.filter(httpResponse => httpResponse.status == StatusCodes.OK)
        .map { httpResponse =>
          println(s"response: $httpResponse")
          val bidOfferFuture = httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
            println("Got response, body: " + body.utf8String)
            BidOfferConverter.getBidOffer(body.utf8String)
          }
          Await.ready(bidOfferFuture, 5 seconds)
          bidOfferFuture.value.get.getOrElse(BidOffer("", 0, ""))
        }
      responseListFuture.foreach { bidOffer =>
        println(s"bidOffer: ${bidOffer.id}, ${bidOffer.bid}, ${bidOffer.content}")
      }
      val bidOfferWinner = responseListFuture.maxBy(_.bid)
      println(s"winner: $bidOfferWinner")
      sender() ! Some(bidOfferWinner.content)
  }
}

相关问题