trait Command {
// all commands are required to have an associated IP Address (here represented in string form)
def ipAddr: String
}
// inside, e.g. the guardian actor, and using the actor context to spawn the router as a child
val serviceKey = ServiceKey[Command]("router")
val router = context.spawn(
Routers.group(serviceKey)
.withConsistentHashingRouting(
virtualNodesFactor = 10,
mapping = { msg: Command => msg.ipAddr }
)
// spawn the workers, who will register themselves with the router
val workerBehavior =
Behaviors.setup[Command] { ctx =>
ctx.system.receptionist ! Receptionist.Register(serviceKey, context.self)
Behaviors.receiveMessage { msg =>
??? // TODO
}
}
(1 to 10).foreach { i =>
context.spawn(workerBehavior, s"worker-$i")
}
1条答案
按热度按时间jdgnovmf1#
您只需要使用要使用的哈希函数初始化路由器。
例如(在Scala中,虽然Java API是类似的):
实际上,对于每个注册的工作者,路由器将生成
10
(virtualNodesFactor
)随机数,并将它们与该工作进程关联。然后,路由器将对每个传入消息执行mapping
函数,以获取该消息的字符串密钥,并对该字符串密钥进行散列。如果存在关联随机数小于或等于该散列的工作进程,工作者选择也小于或等于该散列的最大关联随机数;如果散列值恰好小于与任何工作者相关联的每个随机数,则选择具有最大相关联随机数的工作者。请注意,这意味着给定的工作线程可以处理超过1
ipAddr
的消息。请注意,此算法并不能强有力地保证具有相同
ipAddr
的命令将始终转到相同的工作线程,即使它们被路由到的工作线程仍处于活动状态:如果另一个工作者注册并生成了大于前一个工作者的相关令牌的令牌,并且所生成的令牌小于ipAddr
的散列,则该新工作者将有效地从旧工作者窃取该ipAddr
的消息。缺少这种保证意味着,如果您依赖于给定
ipAddr
的所有消息都发送到同一个工作线程来保证正确性,那么您将需要类似于集群分片的东西,这是更高的开销,但是允许保证没有工作者将看到多个ipAddr
的消息,并且(特别是对于持久性)将保证相同的“逻辑参与者”/实体处理相同ipAddr
的消息。