Akka.如何从服务器关闭WebSocket连接?

xpszyzbs  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(128)

我正在学习akka框架,我正在尝试关闭一个来自服务器的WebSocket连接。

fun main() {

    val system = ActorSystem.create("system")
    val materializer = Materializer.createMaterializer(system)

    val http = Http.get(system)

    val routeFlow = path("ws") {
        get {
            parameter("as") { sys ->
                parameter("id") { id ->
                    parameter("v") { v ->
                        handleWebSocketMessages(mainFlow(sys, id, v))
                    }
                }
            }
        }
    }.flow(system, materializer)

    http.newServerAt("0.0.0.0", 8080).withMaterializer(materializer).bindFlow(routeFlow).thenRun {
        println("Web socket server is running at localhost:8080")
    }
}

fun mainFlow(sys: String, id: String, v: String): Flow<Message, Message, NotUsed> {
    val source = Source.actorRef<Message>(
        { Optional.empty() },
        { Optional.empty() },
        100,
        OverflowStrategy.fail())
        .mapMaterializedValue {
            it.tell(PoisonPill.getInstance(), ActorRef.noSender())
        }

    val sink = Flow.create<Message>()
        .map {

        }
        .to(Sink.onComplete {

        })

    return Flow.fromSinkAndSource(sink, source)
}

我在这里做这件事:

.mapMaterializedValue {
    it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}

我检查了这个Akka websocket - how to close connection by server?。但是它不工作。我得到一个错误:

PoisonPill message sent to StageActor(akka://system/system/Materializers/StreamSupervisor-1/$$c-actorRefSource) will be ignored, since it is not a real Actor.Use a custom message type to communicate with it instead.

我做错了什么,如何关闭它?
解决方法:

fun mainFlow(connectionsController: ActorRef, sys: String, id: String, v: String, system: ActorSystem): Flow<Message, Message, NotUsed> {
    val source = Source.actorRef<Message>(
        {
            if (it is Done) {
                Optional.of(CompletionStrategy.immediately())
            } else {
                Optional.empty()
            }
        },
        { Optional.empty() },
        16,
        OverflowStrategy.fail())
        .mapMaterializedValue {
            it.tell(Done.done(), ActorRef.noSender())
        }

    val sink = Flow.create<Message>()
        .map {

        }
        .to(Sink.onComplete {

        })

    return Flow.fromSinkAndSource(sink, source)
}
fkvaft9z

fkvaft9z1#

服务器将在其上游完成时关闭连接。
前面的答案依赖于旧版本在PoisonPill附近的行为,这将触发完成。
对于Source.actorRef,第一个函数参数是完成匹配器:发送一条消息,函数返回一个非空的Optional,源代码将完成该消息。该Optional的内容是完成策略,它控制源代码是立即完成流还是清空缓冲区。
我对Kotlin不是很熟悉,所以我不能提供任何代码,但我认为mapMaterializedValue不是您想要关闭连接的地方,因为在源代码有机会通过WebSocket发送任何消息之前,该代码将执行。

相关问题