Akka集群接收不到消息

qco9c6ql  于 2023-10-18  发布在  其他
关注(0)|答案(1)|浏览(182)

我尝试使用“集群中的经典分布式发布订阅”的概念从微服务A中的senderActor发送消息我尝试将消息发布到content-topic,但我没有在位于微服务B中的ReceiverActor中接收到它。您是否了解可能导致此问题的原因?
谢谢你,谢谢!
A中的Conf-file

  1. akka {
  2. actor {
  3. provider = "cluster"
  4. }
  5. remote {
  6. artery {
  7. enabled = on
  8. canonical {
  9. hostname = "localhost"
  10. port = 2551
  11. }
  12. }
  13. }
  14. cluster {
  15. seed-nodes = [
  16. "akka://cluster-system@localhost:2551"
  17. ]
  18. }
  19. }

SenderActor:

  1. public class SenderActor extends AbstractActor {
  2. ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
  3. @Override
  4. public Receive createReceive() {
  5. return receiveBuilder()
  6. .match(
  7. String.class,
  8. in -> {
  9. String out = in.toUpperCase();
  10. mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());
  11. })
  12. .build();
  13. }
  14. }

在微服务A中:

  1. public static void main(String[] args) {
  2. SpringApplication.run(MicroserviceAaApplication.class, args);
  3. Config config = ConfigFactory.load("application.conf");
  4. ActorSystem system = ActorSystem.create("cluster-system", config);
  5. Cluster cluster = Cluster.get(system);
  6. ActorRef publisher = system.actorOf(Props.create(SenderActor.class), "publisher");
  7. publisher.tell("hello from microservice A ", null);
  8. }

B中Conf-file

  1. akka {
  2. actor {
  3. provider = "cluster"
  4. }
  5. remote {
  6. artery {
  7. enabled = on
  8. canonical {
  9. hostname = "localhost"
  10. port = 2552
  11. }
  12. }
  13. }
  14. cluster {
  15. seed-nodes = [
  16. "akka://cluster-system@localhost:2551"
  17. ]
  18. }
  19. }

ReceiverActor类:

  1. public class ReceiverActor extends AbstractActor {
  2. LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  3. public ReceiverActor() {
  4. ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
  5. mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf());
  6. }
  7. @Override
  8. public Receive createReceive() {
  9. return receiveBuilder()
  10. .match(String.class, msg -> {
  11. log.info("Received message in receiver-actor-b: {}", msg);
  12. })
  13. .match(DistributedPubSubMediator.SubscribeAck.class,
  14. msg -> log.info("SubscribeAck: {}", msg))
  15. .build();
  16. }
  17. }

微服务B中的主服务:

  1. public static void main(String[] args) {
  2. SpringApplication.run(MicroserviceBbApplication.class, args);
  3. Config config = ConfigFactory.load("application.conf");
  4. ActorSystem system = ActorSystem.create("cluster-system", config);
  5. Cluster cluster = Cluster.get(system);
  6. ActorRef subscriber = system.actorOf(Props.create(ReceiverActor.class), "subscriber");
  7. }

我尝试获取消息“来自微服务A的hello“,但我只得到B中的流日志:

  1. 2023-08-20T15:53:14.067+02:00 INFO 7788 --- [ main] c.e.M.MicroserviceBbApplication : Started MicroserviceBbApplication in 0.909 seconds (process running for 1.723)
  2. 2023-08-20T15:53:14.552+02:00 INFO 7788 --- [lt-dispatcher-5] akka.event.slf4j.Slf4jLogger : Slf4jLogger started
  3. 2023-08-20T15:53:14.900+02:00 INFO 7788 --- [lt-dispatcher-5] akka.remote.artery.ArteryTransport : Remoting started with transport [Artery tcp]; listening on address [akka://cluster-system@localhost:2552] with UID [6730186213375081245]
  4. 2023-08-20T15:53:14.925+02:00 INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Starting up, Akka version [2.8.3] ...
  5. 2023-08-20T15:53:14.980+02:00 INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Registered cluster JMX MBean [akka:type=Cluster]
  6. 2023-08-20T15:53:14.981+02:00 INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Started up successfully
  7. 2023-08-20T15:53:15.006+02:00 INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - No downing-provider-class configured, manual cluster downing required, see https://doc.akka.io/docs/akka/current/typed/cluster.html#downing
  8. 2023-08-20T15:53:15.624+02:00 INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Received InitJoinAck message from [Actor[akka://cluster-system@localhost:2551/system/cluster/core/daemon#1745370250]] to [akka://cluster-system@localhost:2552]
  9. 2023-08-20T15:53:15.675+02:00 INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Welcome from [akka://cluster-system@localhost:2551]
  10. 2023-08-20T15:53:16.077+02:00 INFO 7788 --- [lt-dispatcher-5] c.example.MicroserviceBB.ReceiverActor : SubscribeAck: SubscribeAck(Subscribe(content,None,Actor[akka://cluster-system/user/subscriber1#-339648796]))
  11. 2023-08-20T16:24:19.549+02:00 INFO 7788 --- [t-dispatcher-40] akka.actor.CoordinatedShutdown : Running CoordinatedShutdown with reason [JvmExitReason]
  12. Process finished with exit code 130
ulmd4ohb

ulmd4ohb1#

重要的是要记住,分布式PubSub只会将消息传递给在中介器接收到发布消息时碰巧订阅的订阅者(我简化了一点,但这是直觉)。
在微服务A的main中,您开始集群形成(形成集群的过程发生在后台),然后立即生成SenderActor并向其发送消息,该消息被转发到中介器。还要注意的是,集群可以只由一个节点组成(因为微服务A将自己定义为种子节点),所以在微服务B中的ReceiverActor订阅之前,您将消息发送到中介器;结果,该消息被丢弃。
在这种情况下,在微服务A中有一个订阅不同pubsub主题的参与者可能会很有用; ReceiverActor发布到它订阅了"content"主题的主题,这会导致微服务A中的订阅参与者tellSenderActor,它可以向该主题发布消息。
顺便说一下,目前还不清楚微服务A和B是同一个微服务的示例,还是真正不同的微服务。如果是后者,则不建议使用Akka Cluster进行微服务之间的通信。

相关问题