java—如何基于多个rxjava可完成结果的结果执行操作

wnvonmuf  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(357)

我已经为此绞尽脑汁有一段时间了,我在管理一个需求时迷失了方向,我必须在kotlin中使用rx。
让我解释一下。
有一组ID,其等效项需要从服务器中删除,并最终根据服务器成功情况在本地删除。
基本上
进行网络呼叫以删除单个 id (支持的网络调用返回 Completable )
如果 complete (成功)在存储区收到回调 id 在一个 list (内存)
做第一步和第二步 id 删除
每次网络调用完成后,传递要从本地数据库中删除的列表
所以这些函数是可用的,不能修改。
fun deleteId(id: String): Completable { networkCall.deleteId(id) } fun deleteIds(ids: List<String>): Unit { localDb.deleteId(ids) } 这是我尝试过的,但显然是不完整和卡住。。。

val deleted = CopyOnWriteArrayList<String>()
val error = CopyOnWriteArrayList<String>()
items?.filter { it.isChecked }
    ?.map { Pair(it.id, dataManager.deleteId(it.id)) }
    ?.forEach { (Id, deleteOp) ->
        deleteOp.subscribeOn(Schedulers.io())
                .subscribe(object: CompletableObserver {
                    override fun onComplete() { deleted.add(Id) }

                    override fun onSubscribe(d: Disposable) { disposableManager += d }

                    override fun onError(e: Throwable) { error.add(Id) }

                })
    }

所以现在这里有多个问题,其中之一是我无法找到一个地方来知道所有请求都已完成,以便启动localdb delete。
有什么办法可以用吗 Flowable.fromIterable() 或者 zip 或者 merge 以某种方式遵循上述命令链来实现上述场景?

kwvwclae

kwvwclae1#

如果我正确理解了您的用例,那么应该这样做:

// ids of items to delete, for illustration lets have some temp set
val ids = setOf<String>("1", "2", "3", "4")
val deleteIdSingles = mutableListOf<Single<String>>()
ids.forEach { id ->
    deleteIdSingles.add(
        api.deleteId(id)
            // when request successfully completes, return its id wrapped in a Single, instead of Completable
            .toSingle<String> { id }
            // return a flag when this request fails, so that the stream is not closed and other requests would still be executed
            .onErrorReturn { "FAILED" }
    )
}

Single.merge(deleteIdSingles)
    // collect the results of the singles (i.e. the ids of successful deletes), and emit a set of those ids once all the singles has completed
    .collect(
        { mutableListOf() },
        { deletedIds: MutableList<String>, id: String -> if (id != "FAILED") deletedIds.add(id) }
    )
    .observeOn(Schedulers.io())
    .subscribe(
        { deletedIds ->
                db.deleteIds(deletedIds)
        }, { error ->
            // todo: onError
        })

相关问题