kotlin React器-仅保留/处理慢速消费者的最新值

atmip9wb  于 2023-01-05  发布在  Kotlin
关注(0)|答案(1)|浏览(95)

在Reactor中,当我有一个快速的生产者和一个缓慢的消费者,并且Reactor流中的值就像一个“快照”时,我希望消费者处理流中的最新值并丢弃其他值。(例如,在GUI中显示交换值的消费者与将交换计数转换为Flux的生产者。)
Flux#onBackpressureLatest()操作符似乎是正确的选择。
我搜索了一下,找到了一些用法示例:

Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .delayElements(Duration.ofMillis(3000))
    .subscribe { println("got $it") }

这会在onBackpressureLatest()之后设置一个手动延迟,它更像是一个Flux#sample(Duration),而不是一个缓慢的消费者。
在内部,delayElements(Duration)操作符 Package 了一个concatMap,因此我将其转换为:

Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(3000)
    }

这就像问题Latest overflow strategy with size 1 or any alternatives中提供的答案一样。但是,它看起来有点连线。我不明白为什么我们需要concatMap(op)flatMap(op, 1, 1)调用来使onBackpressureLatest()工作。
我尝试了以下(简化)版本,但它们没有按预期工作,为什么?

// not working try - 1
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .publishOn(Schedulers.boundedElastic())
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(3000)
    }

// not working try - 2
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .publishOn(Schedulers.boundedElastic())
    .subscribe(object : BaseSubscriber<Int>() {
        override fun hookOnSubscribe(subscription: Subscription) {
            // explicitly request 1
            subscription.request(1)
        }

        override fun hookOnNext(value: Int) {
            // simulate slow subscriber with sleep
            Thread.sleep(3000)
            println("got $value")
            // explicitly request 1
            request(1)
        }
    })
oknwwptz

oknwwptz1#

回答我自己的问题
当消费者慢而生产者快时,它们需要在不同的调度线程上运行,否则,如果它们在同一个线程中运行,整个通量链将处于同步模式。如果在这种情况下,消费者和生产者将在单个线程中以相同的速度运行。因此,以下代码将无法工作

// Not working, producer and consumer runs synchronously in the same thread
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(300))
    .onBackpressureLatest()
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(1000)
    }

因此,我们需要在生产者之后切换调度器线程,以确保消费者在不同的线程中运行

如果我们将.onBackpressureLatest()之前的调度器切换为.publishOn,则运算符链的其余部分将在同一个线程中运行,就像我们刚刚启动了另一个线程,并在那里运行同步通量流,这与上面的情况完全相同,因此下面的操作不起作用。

// Not working
// operator publishOn acts as the producer,
// and it runs synchronously in the same thread with the consumer
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(300))
    .publishOn(Schedulers.boundedElastic()) // the following runs synchronously as before
    .onBackpressureLatest()
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(1000)
    }

如果我们把.publishOn(Schedulers.boundedElastic())放在.onBackpressureLatest()之后,它也不起作用,原因是1-arg publishOn方法采用默认的预取值Queues.SMALL_BUFFER_SIZE = 256,所以订阅时它将request(256),这给.onBackpressureLatest()施加了压力,下游需要256个项目,因此.onBackpressureLatest()将直接向.publishOn提供256个值(如果可用),并且.publishOn之后的链同步使用这些项。因此,以下内容不会按预期工作:

// Not working
// 1-arg publishOn has prefetch=256 by default
// this pushes a pressure of 256 to onBackpressureLatest
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(300))
    .onBackpressureLatest()
    .publishOn(Schedulers.boundedElastic()) // pushes pressure of 256
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(1000)
    }

因此,我们需要确保.onBackpressureLatest()之后的运算符链在准备处理下一项时施加1的压力,即以消费者的速度,我们只需要使用第二个prefetch参数调用.publishOn

// Working
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(300))
    .onBackpressureLatest()
    .publishOn(Schedulers.boundedElastic(), 1) // pushes pressure of 1 when ready to process
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(1000)
    }

问题中描述的以下两个替代项可以放在.publishOn行上,它们的作用相同:1)切换调度程序线程2)确保背压为1

  • .concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
  • .flatMap({ Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }, 1, 1)

相关问题