从Akka演员列表中异步收集并编写响应

sr4lhrrt  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(151)

我有一个名为Gate的Akka Actor,它用OpenClosed的响应消息来回答Status消息:

"A stateless gate" must {
    "be open" in {
      val parent = TestProbe()
      val gate = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      gate ! 7
      gate ! Gate.Status
      parent.expectMsg(Gate.Open)
    }

我想做的是构造一个逻辑AND门,它查询一个门列表,如果它们都是打开的,则返回Open

"A logical AND gate" must {
    "be open when all children are open" in {
      val parent = TestProbe()
      val parent2 = TestProbe()
      val gate_1 = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      val gate_2 = parent.childActorOf(
        TestStatelessGate.props(5)
      )
      val gate_list = List(gate_1, gate_2)
      val and_gate = parent2.childActorOf(
        LogicalAndGate.props(gate_list)
      )
      gate_1 ! 7
      gate_2 ! 5
      and_gate ! Gate.Status
      parent2.expectMsg(Gate.Open)

Scala文档中有一些关于使用for表达式和pipe的内容。该文档的相关部分是:

final case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds) // needed for `?` below

val f: Future[Result] =
  for {
    x <- ask(actorA, Request).mapTo[Int] // call pattern directly
    s <- actorB.ask(Request).mapTo[String] // call by implicit conversion
    d <- (actorC ? Request).mapTo[Double] // call by symbolic name
  } yield Result(x, s, d)

f.pipeTo(actorD

我在尝试使用ActorRef列表执行类似的操作时遇到了问题(下面的代码中为gate_list):

override def receive: Receive = {
    case Status => {
      val futures: Seq[Future[Any]] =
        for (g <- gate_list)
          yield ask(g, Status)
      val all_open: Future[Boolean] = Future {
        !futures.contains(Closed)
        }
      pipe(all_open) to parent
    }
  }

当然,这是行不通的,因为futures.contains(Closed)比较的是两种不同类型的东西,一个Future[Any]和我的case对象。

t5fffqht

t5fffqht1#

我假设OpenClosedcase object的值,它们继承了OpenClosed的一些公共特性。
首先,你需要使用mapToask的结果转换为OpenClosed。我也会使用map而不是for

val futures: Seq[Future[OpenClosed]] =
  gate_list.map(g => ask(g, Status).mapTo[OpenClosed])

然后,您需要Future.sequence来等待所有这些操作完成:

Future.sequence(futures).onComplete {
  case Success(res) =>
    parent ! res.forall(_ == Open)
  case Failure(_) =>
    parent ! Closed
}

相关问题