在我的简化示例中,我希望将websocket客户端发送的消息广播给所有其他客户端。该应用程序是使用带有Spring的React式WebSocket构建的。
我的想法是用单曲 Sink
如果从客户端接收到消息,则在该接收器上发出该消息。 WebsocketSession::send
只是转发由它发出的事件 Sink
连接到已连接的客户端。
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })
return Mono.zip(input, output).then()
}
fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)
fun <T> fromJson(json : String, clazz : Class<T>) : T{
return objectMapper.readValue(json, clazz)
}
}
这种实现不像以前那样安全 Sink.emitNext
可以从不同的线程调用。
我的尝试是 publishOn
传递一个单线程 Scheduler
以便 onNext
对所有人 WebSocketSession
从单个线程调用。然而,这不起作用。一个项目从websocket客户端发出,然后所有后续websocket客户端在连接后立即接收onclose事件:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val scheduler = Schedulers.newSingle("sink-scheduler")
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.publishOn(scheduler) // publish on single threaded scheduler
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
...
}
}
我能看到的另一个选择是 synchronize
在一些普通锁上,以便发射是线程安全的:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val lock = Any()
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
synchronized(lock) {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
}
.then()
...
}
}
然而,我不确定是否应该这样做。
问题是
是否可以使用 publishOn
在这种情况下,发射是线程安全的,如果不是的话,这个问题的其他解决方案是什么(除了像我所做的那样使用同步之外) synchronized
关键字)。
2条答案
按热度按时间jvidinwx1#
而不是悲观地锁定
synchronized
选项,您可以创建EmitFailureHandler
相当于FAIL_FAST
除非它回来了true
为了EmitResult.NON_SERIALIZED_ACCESS
.这将导致并发的emit尝试被立即重试,就像在繁忙的循环中一样。
乐观地说,这最终会成功。您甚至可以让自定义处理程序引入延迟或限制其返回的次数
true
如果你想对无限循环有额外的防御。ffscu2ro2#
发布单线程调度程序的方法应该可以工作,但是您需要为每个应用程序使用相同的调度程序示例
ReactiveWebSocketHandler
.你能用平面图而不是Flume来组合所有receive()通量吗?
我自己对这个问题的解决方法采用了simon建议的busyspin方法。
看看我对一个类似问题的回答。