在Reactor中,当我有一个快速的生产者和一个缓慢的消费者,并且Reactor流中的值就像一个“快照”时,我希望消费者处理流中的最新值并丢弃其他值。(例如,在GUI中显示交换值的消费者与将交换计数转换为Flux
的生产者。)Flux#onBackpressureLatest()
操作符似乎是正确的选择。
我搜索了一下,找到了一些用法示例:
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.delayElements(Duration.ofMillis(3000))
.subscribe { println("got $it") }
这会在onBackpressureLatest()
之后设置一个手动延迟,它更像是一个Flux#sample(Duration)
,而不是一个缓慢的消费者。
在内部,delayElements(Duration)
操作符 Package 了一个concatMap
,因此我将其转换为:
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
.subscribe { item ->
println("got $item")
// simulate slow subscriber with sleep
Thread.sleep(3000)
}
这就像问题Latest overflow strategy with size 1 or any alternatives中提供的答案一样。但是,它看起来有点连线。我不明白为什么我们需要concatMap(op)
或flatMap(op, 1, 1)
调用来使onBackpressureLatest()
工作。
我尝试了以下(简化)版本,但它们没有按预期工作,为什么?
// not working try - 1
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.publishOn(Schedulers.boundedElastic())
.subscribe { item ->
println("got $item")
// simulate slow subscriber with sleep
Thread.sleep(3000)
}
// not working try - 2
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.publishOn(Schedulers.boundedElastic())
.subscribe(object : BaseSubscriber<Int>() {
override fun hookOnSubscribe(subscription: Subscription) {
// explicitly request 1
subscription.request(1)
}
override fun hookOnNext(value: Int) {
// simulate slow subscriber with sleep
Thread.sleep(3000)
println("got $value")
// explicitly request 1
request(1)
}
})
1条答案
按热度按时间oknwwptz1#
回答我自己的问题
当消费者慢而生产者快时,它们需要在不同的调度线程上运行,否则,如果它们在同一个线程中运行,整个通量链将处于同步模式。如果在这种情况下,消费者和生产者将在单个线程中以相同的速度运行。因此,以下代码将无法工作
因此,我们需要在生产者之后切换调度器线程,以确保消费者在不同的线程中运行。
如果我们将
.onBackpressureLatest()
之前的调度器切换为.publishOn
,则运算符链的其余部分将在同一个线程中运行,就像我们刚刚启动了另一个线程,并在那里运行同步通量流,这与上面的情况完全相同,因此下面的操作不起作用。如果我们把
.publishOn(Schedulers.boundedElastic())
放在.onBackpressureLatest()
之后,它也不起作用,原因是1-argpublishOn
方法采用默认的预取值Queues.SMALL_BUFFER_SIZE = 256
,所以订阅时它将request(256)
,这给.onBackpressureLatest()
施加了压力,下游需要256个项目,因此.onBackpressureLatest()
将直接向.publishOn
提供256个值(如果可用),并且.publishOn
之后的链同步使用这些项。因此,以下内容不会按预期工作:因此,我们需要确保
.onBackpressureLatest()
之后的运算符链在准备处理下一项时施加1
的压力,即以消费者的速度,我们只需要使用第二个prefetch
参数调用.publishOn
:问题中描述的以下两个替代项可以放在
.publishOn
行上,它们的作用相同:1)切换调度程序线程和2)确保背压为1。.concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
.flatMap({ Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }, 1, 1)