我已经和Akka一起工作了一段时间,但是现在正在深入探索它的演员系统。我知道有线程轮询执行器和fork join执行器和affinity执行器。我知道调度器是如何工作的,以及所有其他细节。顺便说一句,这个链接给出了一个很好的解释
https://scalac.io/improving-akka-dispatchers
然而,当我用一个简单的调用参与者进行实验并切换执行上下文时,我总是得到大致相同的性能。我同时运行60个请求,平均执行时间大约为800毫秒,只是向调用者返回一个简单的字符串。
我运行的MAC有8个核心(英特尔i7处理器).
下面是我尝试过的执行上下文:
thread-poll {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 10
}
fork-join {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 8
}
throughput = 100
}
pinned {
type = Dispatcher
executor = "affinity-pool-executor"
}
所以,问题是:
1.在这个例子中,是否有机会获得更好的性能?
1.参与者示例是怎么回事?如果我们知道调度程序正在调度线程(使用执行上下文),以便在该线程内对来自参与者邮箱的下一条消息执行参与者的receive方法,这有什么关系呢?参与者的receive方法不只是像回调一样吗?参与者示例的数量何时开始起作用?
1.我有一些执行Future的代码,如果我直接从主文件运行该代码,它的执行速度比我把它放在actors中并从actors执行Future时快100-150毫秒,并将其结果传送到发送方。是什么使它变慢了?
如果你有一些真实的世界的例子来解释这一点,那是非常受欢迎的。我读了一些文章,但都是理论上的。如果我在一个简单的例子上尝试一些东西,我会得到一些意想不到的结果,在性能方面。
这里有一个代码
object RedisService {
case class Get(key: String)
case class GetSC(key: String)
}
class RedisService extends Actor {
private val host = config.getString("redis.host")
private val port = config.getInt("redis.port")
var currentConnection = 0
val redis = Redis()
implicit val ec = context.system.dispatchers.lookup("redis.dispatchers.fork-join")
override def receive: Receive = {
case GetSC(key) => {
val sen = sender()
sen ! ""
}
}
}
来电者:
val as = ActorSystem("test")
implicit val ec = as.dispatchers.lookup("redis.dispatchers.fork-join")
val service = as.actorOf(Props(new RedisService()), "redis_service")
var sumTime = 0L
val futures: Seq[Future[Any]] = (0 until 4).flatMap { index =>
terminalIds.map { terminalId =>
val future = getRedisSymbolsAsyncSCActor(terminalId)
val s = System.currentTimeMillis()
future.onComplete {
case Success(r) => {
val duration = System.currentTimeMillis() - s
logger.info(s"got redis symbols async in ${duration} ms: ${r}")
sumTime = sumTime + duration
}
case Failure(ex) => logger.error(s"Failure on getting Redis symbols: ${ex.getMessage}", ex)
}
future
}
}
val f = Future.sequence(futures)
f.onComplete {
case Success(r) => logger.info(s"Mean time: ${sumTime / (4 * terminalIds.size)}")
case Failure(ex) => logger.error(s"error: ${ex.getMessage}")
}
代码非常基本,只是为了测试它的行为。
1条答案
按热度按时间b1zrtrql1#
我不太清楚你到底要问什么,但我会尝试一下。
如果您的调度员(秒)(如果参与者所做的是CPU/内存与IO限制,则为可用内核的实际数量(请注意,虚拟化程度越高,这一点就越模糊)(谢谢,超额预订的主机CPU...)和容器化(谢谢,基于份额和配额的cgroup限制)开始起作用))允许同时处理 m 个参与者,您很少/永远不会有超过 n 个参与者有消息要处理的参与者(m〉n),尝试通过调度器设置增加并行度不会给您带来任何好处。(注意,在前面的描述中,调度器上调度的任何任务,例如
Future
回调,实际上与参与者是同一个东西)。上一段中的 n 显然是应用程序/调度程序中的最多参与者的数目(取决于我们希望在什么范围内观察事物:我会注意到每个超过两个的调度器(一个用于不阻塞的演员和未来,一个用于阻塞的演员和未来)都有更强的气味(如果在Akka 2.5上,这可能是一个不错的想法,适应2.6中关于默认调度器设置的一些变化,并在自己的调度器中运行像远程/集群这样的东西,这样他们就不会被饿死;还要注意,AlpakkaKafka默认使用自己的调度程序:我不会把它们与两个相对立),所以一般来说,更多的参与者意味着更多的并行性意味着更多的内核利用率。相对于线程,参与者相对便宜,所以大量的参与者并不是一个值得关注的大问题。
单例参与者(无论是在节点级还是集群级(在极端情况下,甚至是实体级),都可能会对整体并行性和吞吐量造成很大的限制:一次一条消息的限制可以是非常有效的节流阀(有时这是您想要的,但通常不是).因此,不要害怕创建执行一个高级操作的短期参与者(它们肯定可以处理多条消息),然后停止(注意,这种情况下的许多简单情况可以通过future以稍微更轻量级的方式完成).如果它们与某个外部服务进行交互,使它们成为路由器执行元的子节点,如果现有的子节点都忙碌(等等),则该路由器执行元产生新的子节点,这可能是值得做的:这个路由器是一个单例路由器,但是只要它不花很多时间处理任何消息,它限制系统的可能性就很低。您的
RedisService
可能是这类事情的一个很好的候选者。还要注意,性能和可伸缩性并不总是一回事,提高一个会降低另一个。Akka通常愿意牺牲小规模的性能来减少大规模的性能下降。