我正在学习akka框架,我正在尝试关闭一个来自服务器的WebSocket连接。
fun main() {
val system = ActorSystem.create("system")
val materializer = Materializer.createMaterializer(system)
val http = Http.get(system)
val routeFlow = path("ws") {
get {
parameter("as") { sys ->
parameter("id") { id ->
parameter("v") { v ->
handleWebSocketMessages(mainFlow(sys, id, v))
}
}
}
}
}.flow(system, materializer)
http.newServerAt("0.0.0.0", 8080).withMaterializer(materializer).bindFlow(routeFlow).thenRun {
println("Web socket server is running at localhost:8080")
}
}
fun mainFlow(sys: String, id: String, v: String): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{ Optional.empty() },
{ Optional.empty() },
100,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
我在这里做这件事:
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
我检查了这个Akka websocket - how to close connection by server?。但是它不工作。我得到一个错误:
PoisonPill message sent to StageActor(akka://system/system/Materializers/StreamSupervisor-1/$$c-actorRefSource) will be ignored, since it is not a real Actor.Use a custom message type to communicate with it instead.
我做错了什么,如何关闭它?
解决方法:
fun mainFlow(connectionsController: ActorRef, sys: String, id: String, v: String, system: ActorSystem): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{
if (it is Done) {
Optional.of(CompletionStrategy.immediately())
} else {
Optional.empty()
}
},
{ Optional.empty() },
16,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(Done.done(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
1条答案
按热度按时间fkvaft9z1#
服务器将在其上游完成时关闭连接。
前面的答案依赖于旧版本在
PoisonPill
附近的行为,这将触发完成。对于
Source.actorRef
,第一个函数参数是完成匹配器:发送一条消息,函数返回一个非空的Optional
,源代码将完成该消息。该Optional
的内容是完成策略,它控制源代码是立即完成流还是清空缓冲区。我对Kotlin不是很熟悉,所以我不能提供任何代码,但我认为
mapMaterializedValue
不是您想要关闭连接的地方,因为在源代码有机会通过WebSocket发送任何消息之前,该代码将执行。