akka 未从Web客户端向Web Socket发送或接收消息

q3aa0525  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(225)

在Play应用中,我创建了一个套接字服务器:

package controllers

import play.api.mvc._
import play.api.libs.streams.ActorFlow
import javax.inject.Inject
import akka.actor.ActorSystem
import akka.stream.Materializer

class Application @Inject() (cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer)
  extends AbstractController(cc) {
  def socket = WebSocket.accept[String, String] { request =>
    ActorFlow.actorRef { out =>
      MyWebSocketActor.props(out)
    }
  }
}

import akka.actor._

object MyWebSocketActor {
  def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}

class MyWebSocketActor(out: ActorRef) extends Actor {
  def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }
}

更新路由文件:

GET      /ws                                   controllers.Application.socket

正在阅读https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html
我定义了Web套接字客户端流:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {

  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
    Sink.foreach[Message] {
      case message: TextMessage.Strict =>
        println(message.text)
    }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/ws"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
      .toMat(incoming)(Keep.both) // also keep the Future[Done]
      .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

运行文件WebSocketClientFlow将返回以下内容:

Success(Done)
closed

它对应于正在执行的这些行:

connected.onComplete(println)
closed.foreach(_ => println("closed"))

因此,套接字似乎已成功创建和访问。
但是,MyWebSocketActor应输出接收到的消息,而该消息未被输出:

def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }

“hello world”消息以WebSocketClientFlow发送:

val outgoing = Source.single(TextMessage("hello world!"))

如何在套接字服务器中输出发送的消息并在客户端中输出响应?
发送到客户端的输出在MyWebSocketActor中定义:

out ! ("I received your message: " + msg)

更新:
receive函数:

def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }

但是OUT消息X1M5N1X似乎没有被套接字客户端发送或接收。
我尝试将发送到客户端的响应从

out ! ("I received your message: " + msg)

out ! TextMessage("hello world!")

它应援引:

Sink.foreach[Message] {
  case message: TextMessage.Strict =>
    println(message.text)
}

但由于未调用println(message.text),因此未收到消息。

mrwjdhj3

mrwjdhj31#

发布一个答案案例,它对其他开发者很有用。如果有什么可以改进的,请评论。
公开套接字:

package controllers

import play.api.mvc._
import play.api.libs.streams.ActorFlow
import javax.inject.Inject
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws.TextMessage
import akka.stream.Materializer
import akka.actor._

class Application @Inject()(cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer)
  extends AbstractController(cc) {
  def socket = WebSocket.accept[String, String] { request =>
    ActorFlow.actorRef { out =>
      MyWebSocketActor.props(out)
    }
  }
}

object MyWebSocketActor {

  var counter = 0

  def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}

class MyWebSocketActor(out: ActorRef) extends Actor {

  def receive = {
    case msg: String =>
      MyWebSocketActor.counter = MyWebSocketActor.counter + 1
      print("received message: " + msg + "," + MyWebSocketActor.counter)
      out ! "hello world! "
  }
}

测试套接字:

import akka.actor.{ActorRef, ActorSystem}
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val req = WebSocketRequest(uri = "ws://localhost:9000/ws")
    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message, ActorRef] =
      Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

    val messageSink: Sink[Message, NotUsed] =
      Flow[Message]
        .map(message => println(s"Received text message: [$message]"))
        .to(Sink.ignore)

    val ((ws, upgradeResponse), closed) =
      messageSource
        .viaMat(webSocketFlow)(Keep.both)
        .toMat(messageSink)(Keep.both)
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    ws ! TextMessage.Strict("Hello World")
    ws ! TextMessage.Strict("Hi")
    ws ! TextMessage.Strict("Yay!")

  }
}

在Play2Run控制台上,打印出来的是:

[info] play.api.Play - Application started (Dev) (no global state)
received message: Hello World,1received message: Hi,2received message: Yay!,3

在WebSocketClientFlow控制台上,打印的是:

Received text message: [TextMessage.Strict(hello world! )]
Received text message: [TextMessage.Strict(hello world! )]
Received text message: [TextMessage.Strict(hello world! )]

相关问题