我有两个后端示例,分别托管在两个不同的centos服务器上。我想使用Akka Cluster Sharding
来划分每个示例所做的工作:
- 我有4个国家的数据,每10秒由两个后端示例从数据库中检索一次,它们更新一个
Redis
示例。 - 使用
Akka Cluster Sharding
,我尝试动态地划分工作,instance1
获取ES和EN的数据,instance2
获取DE和IT的数据。如果示例1出现故障,示例2将接管作业,并获取ES/EN的数据。
我以为这很简单...但不是。
所有的工作都是由Akka Actor完成的,所以使用集群分片,我认为所有声明的Actor(来自两个示例)都将集中在某个地方,以便可以操纵哪个角色完成任何工作。
在localhost上,一切正常,因为我的应用程序有一个示例,端口为9001
,还有两个集群节点,端口分别为2551
和2552
。
application.conf
"clusterRegistration" {
akka {
actor {
allow-java-serialization = on
provider = cluster
}
remote.artery {
enabled = on
transport = aeron-udp
}
cluster {
jmx.multi-mbeans-in-same-jvm = on
seed-nodes = [
"akka://ClusterService@instance1:8083",
"akka://ClusterService@instance1:2551"
]
}
}
}
等级
object ClusterSharding {
def createNode(hostname: String, port: Int, role: String, props: Props, actorName: String) = {
val config = ConfigFactory.parseString(
s"""
|akka.cluster.roles = ["$role"]
|akka.remote.artery.canonical.hostname = $hostname
|akka.remote.artery.canonical.port = $port
|""".stripMargin
).withFallback(ConfigFactory.load
.getConfig("clusterRegistration"))
val system = ActorSystem("ClusterService", config)
system.actorOf(props, actorName)
}
val master = createNode("instance1", 8083, "master", Props[Master], "master")
createNode("instance1", 2551, "worker", Props[Worker], "worker")
createNode("instance2", 8083, "worker", Props[Worker], "worker")
Future {
while (true) {
master ! Proceed // this will fire an Actor Resolver case
Thread.sleep(5000)
}
}
}
主要演员
class Master extends Actor {
var workers: Map[Address, ActorRef] = Map()
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(
self,
initialStateMode = InitialStateAsEvents,
classOf[MemberEvent],
classOf[UnreachableMember]
)
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
def receive = handleClusterEvents // cluster events
.orElse(handleWorkerRegistration) // worker registered to cluster
.orElse(handleJob) // give jobs to workers
def handleJob: Receive = {
case Proceed => {
// Here I must be able to use all workers from both instances
// (centos1 and centos2) and give work for each dinamically
if (workers.length == 2) {
worker1 ! List("EN", "ES")
worker2 ! List("DE", "IT")
} else if (workers.length == 1) {
worker ! List("EN", "ES", "DE", "IT")
} else {
execQueries() // if no worker is available, each backend instance will exec queries on his own way
}
}
}
}
这两个示例都使用端口8083(centos 1:instance1:8083
,百分之二:instance2:8083
)。如果我只对application.conf和createNode中的一个示例(例如instance 1)使用设置,我可以在日志中看到创建了worker,但与第二个示例没有通信。
我哪里错了?谢谢
1条答案
按热度按时间2w3rbyxf1#
您配置主机名的方法是可行的。有更好的方法来配置主机名(取决于您部署服务的方式:手动部署与ansible/chef/puppet与docker与kubernetes/nomad/mesos将有所不同),但设置主机名可能不是您的实际问题。
您当前的方法将在每个节点上为您提供一个主节点和两个工作节点,而您实际上并没有使用群集分片(您使用的是Cluster,但是Cluster Sharding是您在Cluster之上选择使用的)。从您发布的代码来看,我强烈怀疑使用Cluster Sharding将需要进行重大的重新设计(虽然没有发布
Worker
和更完整的Master
代码,这很难说)。我会采取的主要方法是让一个特定国家的Redis更新过程由一个分片实体拥有集群单例参与者将每10秒触发一次每个国家的更新过程,因为我们使用分片和单例,所以实际上我可能至少有3个服务示例,或者使用一个非常一致外部租赁系统,(其他解决裂脑问题的策略(注意集群分片和集群单例基本上迫使您解决裂脑问题)都可以归结为一个问题,至少有一半的时间,因为分片意味着进程的参与者可以被任意停止(并且可能在不同的节点上恢复),所以您还需要考虑如何以一种对应用程序有意义的方式恢复进程。
在同一个JVM进程中启动多个
ActorSystem
通常只有在相当特定的情况下才是一个好主意。