java—如何在SpringWebFlux中正确地从多个通量(websocketsession::receive)向sink发出值?

wfauudbj  于 2021-06-30  发布在  Java
关注(0)|答案(2)|浏览(440)

在我的简化示例中,我希望将websocket客户端发送的消息广播给所有其他客户端。该应用程序是使用带有Spring的React式WebSocket构建的。
我的想法是用单曲 Sink 如果从客户端接收到消息,则在该接收器上发出该消息。 WebsocketSession::send 只是转发由它发出的事件 Sink 连接到已连接的客户端。

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()
    }

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)
    }

}

这种实现不像以前那样安全 Sink.emitNext 可以从不同的线程调用。
我的尝试是 publishOn 传递一个单线程 Scheduler 以便 onNext 对所有人 WebSocketSession 从单个线程调用。然而,这不起作用。一个项目从websocket客户端发出,然后所有后续websocket客户端在连接后立即接收onclose事件:

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        ...
    }

}

我能看到的另一个选择是 synchronize 在一些普通锁上,以便发射是线程安全的:

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                    }
                }
                .then()
        ...
    }

}

然而,我不确定是否应该这样做。

问题是

是否可以使用 publishOn 在这种情况下,发射是线程安全的,如果不是的话,这个问题的其他解决方案是什么(除了像我所做的那样使用同步之外) synchronized 关键字)。

jvidinwx

jvidinwx1#

而不是悲观地锁定 synchronized 选项,您可以创建 EmitFailureHandler 相当于 FAIL_FAST 除非它回来了 true 为了 EmitResult.NON_SERIALIZED_ACCESS .
这将导致并发的emit尝试被立即重试,就像在繁忙的循环中一样。
乐观地说,这最终会成功。您甚至可以让自定义处理程序引入延迟或限制其返回的次数 true 如果你想对无限循环有额外的防御。

ffscu2ro

ffscu2ro2#

发布单线程调度程序的方法应该可以工作,但是您需要为每个应用程序使用相同的调度程序示例 ReactiveWebSocketHandler .
你能用平面图而不是Flume来组合所有receive()通量吗?
我自己对这个问题的解决方法采用了simon建议的busyspin方法。
看看我对一个类似问题的回答。

相关问题