Akka集群分片配置

46scxncf  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(188)

我有两个后端示例,分别托管在两个不同的centos服务器上。我想使用Akka Cluster Sharding来划分每个示例所做的工作:

  • 我有4个国家的数据,每10秒由两个后端示例从数据库中检索一次,它们更新一个Redis示例。
  • 使用Akka Cluster Sharding,我尝试动态地划分工作,instance1获取ES和EN的数据,instance2获取DE和IT的数据。如果示例1出现故障,示例2将接管作业,并获取ES/EN的数据。

我以为这很简单...但不是。
所有的工作都是由Akka Actor完成的,所以使用集群分片,我认为所有声明的Actor(来自两个示例)都将集中在某个地方,以便可以操纵哪个角色完成任何工作。
在localhost上,一切正常,因为我的应用程序有一个示例,端口为9001,还有两个集群节点,端口分别为25512552
application.conf

"clusterRegistration" {
    akka {
      actor {
        allow-java-serialization = on
        provider = cluster
      }
      remote.artery {
        enabled = on
        transport = aeron-udp
      }
      cluster {
        jmx.multi-mbeans-in-same-jvm = on
        seed-nodes = [
            "akka://ClusterService@instance1:8083",
            "akka://ClusterService@instance1:2551"
        ]
      }
    }
}

等级

object ClusterSharding {
  def createNode(hostname: String, port: Int, role: String, props: Props, actorName: String) = {
    val config = ConfigFactory.parseString(
      s"""
         |akka.cluster.roles = ["$role"]
         |akka.remote.artery.canonical.hostname = $hostname
         |akka.remote.artery.canonical.port = $port
         |""".stripMargin
    ).withFallback(ConfigFactory.load
      .getConfig("clusterRegistration"))

    val system = ActorSystem("ClusterService", config)
    system.actorOf(props, actorName)
  }

  val master = createNode("instance1", 8083, "master", Props[Master], "master")
  createNode("instance1", 2551, "worker", Props[Worker], "worker")
  createNode("instance2", 8083, "worker", Props[Worker], "worker")

  Future {
    while (true) {
      master ! Proceed // this will fire an Actor Resolver case
      Thread.sleep(5000)
    }
  }
}

主要演员

class Master extends Actor {
  var workers: Map[Address, ActorRef] = Map()

  val cluster = Cluster(context.system)

  override def preStart(): Unit = {
    cluster.subscribe(
      self,
      initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent],
      classOf[UnreachableMember]
    )
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }
  def receive = handleClusterEvents // cluster events
    .orElse(handleWorkerRegistration) // worker registered to cluster
    .orElse(handleJob) // give jobs to workers

  def handleJob: Receive = {
    case Proceed => {
       // Here I must be able to use all workers from both instances 
       // (centos1 and centos2) and give work for each dinamically
         if (workers.length == 2) {
           worker1 ! List("EN", "ES")
           worker2 ! List("DE", "IT")
         } else if (workers.length == 1) {
           worker ! List("EN", "ES", "DE", "IT")
         } else {
            execQueries() // if no worker is available, each backend instance will exec queries on his own way
         }
    }
  }
}

这两个示例都使用端口8083(centos 1:instance1:8083,百分之二:instance2:8083)。如果我只对application.conf和createNode中的一个示例(例如instance 1)使用设置,我可以在日志中看到创建了worker,但与第二个示例没有通信。
我哪里错了?谢谢

2w3rbyxf

2w3rbyxf1#

您配置主机名的方法是可行的。有更好的方法来配置主机名(取决于您部署服务的方式:手动部署与ansible/chef/puppet与docker与kubernetes/nomad/mesos将有所不同),但设置主机名可能不是您的实际问题。
您当前的方法将在每个节点上为您提供一个主节点和两个工作节点,而您实际上并没有使用群集分片(您使用的是Cluster,但是Cluster Sharding是您在Cluster之上选择使用的)。从您发布的代码来看,我强烈怀疑使用Cluster Sharding将需要进行重大的重新设计(虽然没有发布Worker和更完整的Master代码,这很难说)。
我会采取的主要方法是让一个特定国家的Redis更新过程由一个分片实体拥有集群单例参与者将每10秒触发一次每个国家的更新过程,因为我们使用分片和单例,所以实际上我可能至少有3个服务示例,或者使用一个非常一致外部租赁系统,(其他解决裂脑问题的策略(注意集群分片和集群单例基本上迫使您解决裂脑问题)都可以归结为一个问题,至少有一半的时间,因为分片意味着进程的参与者可以被任意停止(并且可能在不同的节点上恢复),所以您还需要考虑如何以一种对应用程序有意义的方式恢复进程。
在同一个JVM进程中启动多个ActorSystem通常只有在相当特定的情况下才是一个好主意。

相关问题