rxjava单元素缓冲区

4ktjp1zp  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(406)

我有一个例子,有一个对象在随机时间内发布对象,我想把它以每秒的速度收集到缓冲区中,并通过一些策略(如max score)进行过滤,以确保每秒缓冲区中只有一个对象。

subject
    .buffer(1L, TimeUnit.SECONDS)
    .filter {
        isNotEmpty
    }
    .doOnNext {
        // I get all object in the one second
        // That waste too much memory, the non-max object shouldn't be put into the buffer
        _.asScala.max(byScore)
    }
    .ignoreElements
    .subscribeOn(Schedulers.io)
    .subscribe

此代码将在一秒钟内保存所有对象并返回给我。
这不是我想要的。
有什么解决办法吗?

oxalkeyp

oxalkeyp1#

您可以使用以下版本的 buffer 操作员:

.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
       Callable<U> bufferSupplier,
       boolean restartTimerOnMaxSize)

它允许你定义你的习惯 bufferSupplier -用于存储缓冲值的集合。然后,您可以创建集合的自定义版本,在其中最多存储一个项目,在我们的示例中,如果新的、更大的,则替换现有值:

class SingleItemMaxCollection : ArrayList<Long>() {

    override fun add(element: Long): Boolean {
        return when {
            size == 1 && get(0) < element -> { super.set(0, element); true }
            size == 0 -> { super.add(element); true }
            else -> false
        }
    }
}

演示,如何在一些模拟数据上使用它(每400毫秒发射一次项):

class SO65020891 {

    private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
        .concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }

    private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }

    fun getBufferedMax(): Observable<Long> {
        return dataProvider()
            .buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
            .filter { it.isNotEmpty() }
            .map { it[0] }
    }
}

最后,一些验证:

class SO65020891Test {

    @Test
    fun maxEmittedValuesReturnedWithinWindows() {
        val tested = SO65020891()

        val values = tested.getBufferedMax().blockingIterable().toList()

        assertEquals(listOf(2L, 4L, 6L), values)
    }
}

相关问题