akka 如何从接收器发出消息并将它们传递给另一个函数?

vbopmzt1  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(131)

我目前正在使用Akka-HTTP构建一个客户端WebSockets消费者。我不想尝试在Sink中进行解析,而是想将代码 Package 在一个函数中,该函数从Sink发出结果,然后使用该输出(来自函数)进行进一步处理(更多解析......等)。
我目前能够打印来自Sink的每一条消息;但是,函数的返回类型仍然是Unit。我的目标是从函数中发出一个String,用于接收器中的每个项,然后使用返回的字符串进行进一步的解析。我有目前为止的代码(注意:它主要是锅炉板)。

  1. import java.util.concurrent.atomic.AtomicInteger
  2. import akka.Done
  3. import akka.actor.ActorSystem
  4. import akka.http.scaladsl.Http
  5. import akka.http.scaladsl.model.StatusCodes
  6. import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
  7. import akka.http.scaladsl.settings.ClientConnectionSettings
  8. import akka.stream.Materializer
  9. import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
  10. import akka.util.ByteString
  11. import scala.concurrent.ExecutionContext.Implicits.global
  12. import scala.concurrent.Future
  13. import scala.util.{Failure, Success, Try}
  14. object client extends App {
  15. def parseData(uri: String)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  16. val defaultSettings = ClientConnectionSettings(system)
  17. val pingCounter = new AtomicInteger()
  18. val customWebsocketSettings = defaultSettings.websocketSettings.withPeriodicKeepAliveData(
  19. () => ByteString(s"debug-${pingCounter.incrementAndGet()}")
  20. )
  21. val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings)
  22. val outgoing = Source.maybe[Message]
  23. val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
  24. case message: TextMessage.Strict => message.text // I Want to emit/stream this message as a String from the function (or get a handle on it from the outside)
  25. case _ => println("Other")
  26. }
  27. val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
  28. Http().webSocketClientFlow(WebSocketRequest(uri), settings = customSettings)
  29. val (upgradeResponse, closed) =
  30. outgoing
  31. .viaMat(webSocketFlow)(Keep.right)
  32. .toMat(sink)(Keep.both)
  33. .run()
  34. val connected = upgradeResponse.flatMap { upgrade =>
  35. if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
  36. Future.successful(Done)
  37. } else {
  38. throw new RuntimeException(
  39. s"Connection failed: ${upgrade.response.status}"
  40. )
  41. }
  42. }
  43. connected.onComplete {
  44. case Success(value) => value
  45. case Failure(exception) => throw exception
  46. }
  47. closed.onComplete { _ =>
  48. println("Retrying...")
  49. parseData(uri)
  50. }
  51. upgradeResponse.onComplete {
  52. case Success(value) => println(value)
  53. case Failure(exception) => throw exception
  54. }
  55. }
  56. }

在一个单独的对象中,我想做解析,比如:

  1. import akka.actor.ActorSystem
  2. import akka.stream.Materializer
  3. import api.client.parseData
  4. object Application extends App {
  5. implicit val system: ActorSystem = ActorSystem()
  6. implicit val materializer: Materializer = Materializer(system)
  7. val uri = "ws://localhost:8080/foobar"
  8. val res = parseData(uri) // I want to handle the function output here
  9. // parse(res)
  10. println(res)

有没有方法可以从函数外部获得接收器的句柄,或者我需要在接收器中进行任何解析。我主要是试图不使接收器过于复杂。
更新:我还在考虑,向流中添加另一个Flow元素(它处理解析)是否比从流外部获取值更好。

unftdfkk

unftdfkk1#

添加一个流元素似乎可以解决您的问题,同时又完全符合习惯。
您必须记住的是,sink语义旨在描述如何“终止”流,因此,尽管它可以描述非常复杂的计算,但它将始终返回一个值,该值仅在流结束时返回。
换句话说,接收器并不是针对每个流元素返回一个值,而是针对每个整个流返回一个值。

相关问题