Spring Boot 种子节点无法加入akka群集

cvxl0en2  于 2024-01-06  发布在  Spring
关注(0)|答案(1)|浏览(156)

我尝试为我的akka类型的actorsystem引导一个akka集群,但由于某种原因,种子节点无法加入集群。
第一个月
我尝试在我的OCR管道中使用akka的演员模型。我有三个演员和一个监护人演员,都在演员系统中定义。下面是我的演员系统。

  1. package io.tajji.kycpipeline.actor
  2. import akka.actor.typed.ActorSystem
  3. import akka.actor.typed.javadsl.Behaviors
  4. import akka.cluster.typed.Cluster
  5. import io.tajji.apis.kyc.events.*
  6. import io.tajji.kycpipeline.model.Command.*
  7. import io.tajji.kycpipeline.model.Message
  8. import io.tajji.kycpipeline.model.Message.*
  9. import io.tajji.kycpipeline.service.KYCValidationService
  10. import io.tajji.kycpipeline.service.TextExtractionService
  11. import io.tajji.kycpipeline.service.TextParsingService
  12. import org.axonframework.eventhandling.EventBus
  13. import org.axonframework.eventhandling.GenericEventMessage
  14. import org.springframework.stereotype.Component
  15. import reactor.core.publisher.Mono
  16. @Component
  17. class KYCSystem(
  18. private val eventBus: EventBus,
  19. private val textExtractor: TextExtractionService,
  20. private val detailsParser: TextParsingService,
  21. private val kycValidator: KYCValidationService
  22. ) {
  23. private val system: ActorSystem<Message> = ActorSystem.create(
  24. Behaviors.setup<Message> { context ->
  25. val validator =
  26. context.spawn(KYCValidator.create(kycValidator),
  27. "DocumentValidatorActor")
  28. val documentExtractorActor =
  29. context.spawn(DocumentExtractor.create(textExtractor),
  30. "TextExtractorActor")
  31. val detailsParserActor =
  32. context.spawn(DocumentParser.create(detailsParser),
  33. "DocumentParserActor")
  34. Behaviors.receiveMessage { message ->
  35. when(message) {
  36. is LandlordKYC -> {
  37. val validateLandlordDocuments = ValidateLandlordDocuments(
  38. message.event.accountId,
  39. message.event,
  40. context.self
  41. )
  42. validator.tell(validateLandlordDocuments)
  43. }
  44. is ResidentKYC -> {
  45. val validateResidentDocuments = ValidateResidentDocuments(
  46. message.event.accountId,
  47. message.event,
  48. context.self
  49. )
  50. validator.tell(validateResidentDocuments)
  51. }
  52. is ValidLandlordDocuments -> {
  53. val extractLandlordDetails = ExtractLandlordDetails(
  54. message.accountId,
  55. message.idFront,
  56. message.idBack,
  57. message.pinCertificateData,
  58. context.self
  59. )
  60. documentExtractorActor.tell(extractLandlordDetails)
  61. }
  62. is ValidResidentDocuments -> {
  63. val extractResidentDetails = ExtractResidentDetails(
  64. message.accountId,
  65. message.idFront,
  66. message.idBack,
  67. context.self
  68. )
  69. documentExtractorActor.tell(extractResidentDetails)
  70. }
  71. is ExtractedLandlordDetails -> {
  72. val parseLandlordDetails = ParseLandlordDetails(
  73. message.accountId,
  74. message,
  75. context.self
  76. )
  77. detailsParserActor.tell(parseLandlordDetails)
  78. }
  79. is ExtractedResidentDetails -> {
  80. val parseResidentDetails = ParseResidentDetails(
  81. message.accountId,
  82. message,
  83. context.self
  84. )
  85. detailsParserActor.tell(parseResidentDetails)
  86. }
  87. is NationalIdInvalid -> {
  88. eventBus.publish(GenericEventMessage
  89. .asEventMessage<KYCFailed>(KYCFailed(
  90. message.accountId,
  91. message.reason
  92. )))
  93. }
  94. is PinCertificateInvalid -> {
  95. eventBus.publish(GenericEventMessage
  96. .asEventMessage<KYCFailed>(KYCFailed(
  97. message.accountId,
  98. message.reason
  99. )))
  100. }
  101. is ParsedResidentDetails -> {
  102. message.nationalIDData.subscribe { data ->
  103. eventBus.publish(GenericEventMessage
  104. .asEventMessage<ResidentKYCPassed>(ResidentKYCPassed(
  105. message.accountId,
  106. data
  107. )))
  108. }
  109. }
  110. is ParsedLandlordDetails -> {
  111. val accountId = message.accountId
  112. Mono.zip(
  113. message.nationalIDData,
  114. message.taxData
  115. ).subscribe { tuple ->
  116. val nationalId = tuple.t1
  117. val taxData = tuple.t2
  118. eventBus.publish(GenericEventMessage
  119. .asEventMessage<LandlordKYCPassed>(LandlordKYCPassed(
  120. accountId,
  121. nationalId,
  122. taxData
  123. )))
  124. }
  125. }
  126. is Invalid -> {
  127. eventBus.publish(GenericEventMessage
  128. .asEventMessage<KYCFailed>(KYCFailed(
  129. message.accountId,
  130. message.failureReason
  131. )))
  132. }
  133. }
  134. Behaviors.same()
  135. }
  136. }, "KYCSystem"
  137. )
  138. private val kycCluster = Cluster.get(system)
  139. fun processLandlordKYC(event: LandlordKYCRequested) {
  140. system.tell(LandlordKYC(event))
  141. }
  142. fun processResidentKYC(event: ResidentKYCRequested) {
  143. system.tell(ResidentKYC(event))
  144. }
  145. }

字符串
下面是我的集群配置文件

  1. akka {
  2. cluster {
  3. seed-nodes = ["akka://[email protected]:25520"]
  4. shutdown-after-unsuccessful-join-seed-nodes = 20s
  5. seed-node-timeout = 15s
  6. log-info-verbose = off
  7. downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  8. }
  9. actor {
  10. provider = cluster
  11. }
  12. remote {
  13. netty.tcp {
  14. hostname = "127.0.0.1"
  15. port = 25520
  16. }
  17. artery {
  18. }
  19. }
  20. coordinated-shutdown {
  21. exit-jvm = on
  22. }
  23. }

xj3cbfub

xj3cbfub1#

根据给出的错误日志,

  1. Cluster Node [akka://[email protected]:25520] - Joining of seed-nodes [akka://[email protected]:25520]

字符串
这里是你启动的ActorSystemakka://email protected(https://stackoverflow.com/cdn-cgi/l/email-protection):25520
所以你必须在种子节点配置中配置相同的配置。并且主机和端口必须匹配。

  1. akka {
  2. cluster {
  3. seed-nodes = ["akka://[email protected]:25520"]
  4. shutdown-after-unsuccessful-join-seed-nodes = 20s
  5. seed-node-timeout = 15s
  6. log-info-verbose = off
  7. downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  8. }
  9. actor {
  10. provider = cluster
  11. }
  12. remote {
  13. netty.tcp {
  14. hostname = "127.0.1.1"
  15. port = 25520
  16. }
  17. }
  18. coordinated-shutdown {
  19. exit-jvm = on
  20. }
  21. }

展开查看全部

相关问题