Akka类变量范围问题

r7xajy2e  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(109)

如何在MasterActor类中的receive函数之外将mapActors变量设置为全局变量?
f1f2函数从MapReduceApp发送到MastorActor。
另一种方法是在MastorActor中的receive函数之前创建mapActor。然后当从MapReduceApp接收f1/f2函数时,可以将f1f2分别发送到mapActor和reduceActor。但如何实现这一点呢?将f1f2函数传递到mapActor和reduceActor似乎是一个令人头痛的问题。

class MasterActor extends Actor {
  var reduceActors = List[ActorRef]()
  var mapActors: ActorRef = ???
  def receive = {
    case SetMapperReducer(f1, f2, data) =>
      for (i <- 0 until 4) {
        reduceActors = context.actorOf(
          Props(classOf[ReduceActor], f2), name = "reduce" + i
        ) :: reduceActors
      }
      mapActors = context.actorOf(
        RoundRobinPool(4).props(
          Props(classOf[MapActor], reduceActors, f1)
        )
      )
    case Flush =>
      mapActors ! Broadcast(Flush)
}

Map和归约两个类。
Map执行元:

class MapActor(
  reduceActors: List[ActorRef],
  mf: Any => List[Tuple2[Any,Any]]
) extends Actor {
  def receive = {
    case MapData(data: Any) =>
      println(data)
    case Flush =>
      println("Got flush")
  }
}

class ReduceActor(rf: Any => List[Tuple2[Any,Any]]) extends Actor {
  def receive = {
    case Data(data: Any) =>
      println(data)
    case Flush =>
      println("Got flush")
}

还有一个主程序:

object MapReduceApp extends App {
  val system = ActorSystem("TestApp")
  val master = system.actorOf(Props[MasterActor](), name = "master")
  val data = List(
    ("Episode 1", "Once upon a time, in a land far, far away...")
  )
  master ! SetMapperReducer(mapProcess1 _, reduceProcess1 _, data)
}
krcsximq

krcsximq1#

在Actors中不要使用var,使用context.become来处理状态。

class MasterActor extends Actor {
  def receive = receiveMain(Nil, None)

  def receiveMain(
      reduceActors: List[ActorRef],
      mapActor: Option[ActorRef],
  ): Receive = {
    case SetMapperReducer(f1, f2, data) =>
      val reduceActors =
        (0 until 4).map { i =>
          context.actorOf(
            Props(classOf[ReduceActor], f2),
            name = "reduce" + i
          )
        }.toList
      val mapActors = context.actorOf(
        RoundRobinPool(4).props(
          Props(classOf[MapActor], reduceActors, f1)
        )
      )

      context.become(reduceActors, Option(mapActors))

    case Flush =>
      mapActors.foreach(_ ! Broadcast(Flush))
  }
}
c0vxltue

c0vxltue2#

首先为mapActors变量创建一个TestActor类似乎可以达到这个目的,然后可以在case中对其进行修改。
主要演员:

...
  var mapActors = context.actorOf(
      RoundRobinPool(numberMappers).props(
        Props(classOf[TestActor])
      )
    )
  def receive = {
    case SetMapperReducer(f1, f2, data) =>
    ...
  }
  ...

然后就创建临时TestActor类:

class TestActor() extends Actor {
  def receive = {
    case Flush =>
      println("hi")
  }
}

相关问题