如何使用Spring WebFlux将sink.asFlux()与服务器发送事件(SSE)结合起来?

zy1mlcev  于 2023-02-03  发布在  Spring
关注(0)|答案(1)|浏览(252)

我正在使用带有WebFlux的Sping Boot 2.7.8。
我班上有一个Flume,像这样:

private final Sinks.Many<TaskEvent> sink = Sinks.many()
                                                    .multicast()
                                                    .onBackpressureBuffer();

这可用于订阅,如下所示:

public Flux<List<TaskEvent>> subscribeToTaskUpdates() {
        return sink.asFlux()
                   .buffer(Duration.ofSeconds(1))
                   .share();
    }

@Controller像这样使用它将更新作为服务器发送事件(SSE)推送到浏览器:

@GetMapping("/transferdatestatuses/updates")
    public Flux<ServerSentEvent<TransferDateStatusesUpdateEvent>> subscribeToTransferDataStatusUpdates() {
        return monitoringSseBroker.subscribeToTaskUpdates()
                                  .map(taskEventList -> ServerSentEvent.<TransferDateStatusesUpdateEvent>builder()
                                                                       .data(TransferDateStatusesUpdateEvent.of(taskEventList))
                                                                       .build())

一开始这很好用,但是如果我在(Thymeleaf)Web应用程序中导航到一个与SSE URL没有连接的页面,然后返回,那么浏览器就无法再连接了。
经过一番调查,我发现问题是删除订阅者关闭了流量,新订阅者无法再连接。
我已经找到了3种方法来修复它,但我不了解内部足以决定哪一个是最好的解决方案,如果有任何事情,我需要考虑决定使用什么。

  • 解决方案1*

通过使用允许设置此参数的onBackpressureBuffer方法重载,禁用接收器上的autoCancel:

private final Sinks.Many<TaskEvent> sink = Sinks.many()
                                                    .multicast()
                                                    .onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
  • 解决方案2*

使用replay(0).autoConnect()代替share()

public Flux<List<TaskEvent>> subscribeToTaskUpdates() {
        return sink.asFlux()
                   .buffer(Duration.ofSeconds(1))
                .replay(0).autoConnect();
                   
    }
  • 解决方案3*

使用publish().autoConnect()代替share()

public Flux<List<TaskEvent>> subscribeToTaskUpdates() {
        return sink.asFlux()
                   .buffer(Duration.ofSeconds(1))
                   .publish().autoConnect();
    }

建议使用哪种解决方案来确保浏览器可以断开连接并在稍后再次连接而不会出现问题?

pbpqsu0x

pbpqsu0x1#

我不太确定这是否是你的问题的根源,但我没有使用keepAlive通量的问题。

val keepAlive = Flux.interval(Duration.ofSeconds(10)).map {
        ServerSentEvent.builder<Image>()
            .event(":keepalive")
            .build()
    }
    return Flux.merge(
        keepAlive,
        imageUpdateFlux
    )

下面是整个文件:Github

相关问题