没有Akka,如何实现演员模型?

weylhg0b  于 2022-12-04  发布在  其他
关注(0)|答案(2)|浏览(157)

如何实现没有Akka的简单演员?我不需要高性能的许多(非固定计数)演员示例,绿色线程,IoC(生命周期,基于属性的工厂,ActorRef的),监督,背压等。只需要顺序性(队列)+处理程序+状态+消息传递。
作为一个副作用,我实际上需要一个基于演员的小管道(使用递归链接)+一些并行操作器来优化DSP算法计算。它将位于库内,没有传递依赖关系,因此我不希望(因为它是一个jar插件,所以不能)来推动用户创建和传递akkaSystem,库应该具有尽可能简单和轻量级接口。(功能集),而不是一个框架--所以它的算法复杂度比结构复杂度高。然而,我认为actor是描述协议的一个很好的工具,我实际上可以将算法分解为少量的异步交互实体,因此它符合我的需要。

  • 为什么不是 akka *

akka 很重,这意味着:

  • 它是一种外部依赖;
  • 接口和实现复杂;
  • 对库用户不透明,例如-所有示例都由akka的IoC管理,因此无法保证同一示例始终维护一个逻辑角色,重启将创建一个新的;
  • 这与Scala的迁移支持本身相当。
  • 使用jstack/jconsole/jvisualvm调试akka的绿色线程也可能更难,因为一个参与者可能会对任何线程进行操作。

当然, akka 的jar(1.9Mb)和内存消耗(每GB 250万个演员)一点都不重,所以你甚至可以在Android上运行它。但大家也知道,你应该使用专门的工具来观察和分析演员(如类型安全激活器/控制台),用户可能不熟悉(我不会强迫他们去学习它)对于企业项目来说,这一切都很好,因为它几乎总是有IoC,一些专门的工具集和持续的迁移,但对于简单库来说,这不是一种好方法。
关于依赖关系。我没有依赖关系,也不想添加任何依赖关系(我甚至避免使用scalaz,它实际上有点适合这里),因为它会导致繁重的维护--我必须让我的简单库与Akka保持最新。

km0tfn4u

km0tfn4u1#

下面是JVM世界中最小和最高效的参与者,其API基于Viktor Klang的最小Scala参与者:https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/src/test/scala/com/github/gist/viktorklang/Actor.scala
它在使用上方便且安全,但在消息接收方面不是类型安全的,并且不能在进程或主机之间发送消息。
主要特点:

  • 最简单的类似FSM的API,只有3种状态(StayBecomeDie):https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/src/test/scala/com/github/gist/viktorklang/Actor.scala#L28-L30
  • 最低限度的错误处理-正确地转发到执行器线程的默认异常处理程序:https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/src/test/scala/com/github/gist/viktorklang/Actor.scala#L52-L53
  • 快速异步初始化,需要约200 ns完成,因此不需要额外的future/actors来完成耗时的actors初始化:https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/out0.txt#L447
  • 最小的内存占用,即被动状态下约40字节(顺便说一句,new String()在JVM堆中占用相同数量的字节):https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/out0.txt#L449
  • 消息处理效率非常高,4核CPU的吞吐量约为90 M msg/sec:https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/out0.txt#L466
  • 消息发送/接收效率非常高,延迟约为100 ns:https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/out0.txt#L472
  • 每个参与者通过批处理参数调整公平性:https://github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/src/test/scala/com/github/gist/viktorklang/Actor.scala#L32

状态计数器示例:

def process(self: Address, msg: Any, state: Int): Effect = if (state > 0) { 
     println(msg + " " + state)
     self ! msg
     Become { msg => 
        process(self, msg, state - 1)
     }
  } else Die

  val actor = Actor(self => msg => process(self, msg, 5))

结果:

scala> actor ! "a"
a 5

scala> a 4
a 3
a 2
a 1
wmomyfyw

wmomyfyw2#

这将使用FixedThreadPool(以及其内部任务队列):

import scala.concurrent._

trait Actor[T] {
  implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
  def receive: T => Unit
  def !(m: T) = Future { receive(m) }
}

这里,大小为1的FixedThreadPool保证了顺序性。当然,如果你需要100500个动态创建的参与者,这不是管理线程的最佳方式,但如果你需要每个应用程序有一些固定数量的参与者来实现你的协议,这是可以的。
用法:

class Ping(pong: => Actor[Int])  extends Actor[Int] {     
      def receive = {
          case m: Int => 
             println(m)
             if (m > 0) pong ! (m - 1)
      }    
}

object System { 
      lazy val ping: Actor[Int] = new Ping(pong) //be careful with lazy vals mutual links between different systems (objects); that's why people prefer ActorRef
      lazy val pong: Actor[Int] = new Ping(ping)
}

System.ping ! 5

结果:

import scala.concurrent._
defined trait Actor
defined class Ping
defined object System
res17: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@6be61f2c
5
4
3
2
1
0

scala> System.ping ! 5; System.ping ! 7
5
7
4
6
3
5
2
res19: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@54b053b1
4
1
3
0
2
1
0

此实现使用两个Java线程,因此比不使用并行化的计数快“两倍”。

相关问题