android 如何合并两个输入依赖于其他输出的串行流?

eqqqjvef  于 2023-02-17  发布在  Android
关注(0)|答案(1)|浏览(126)

我有一个复杂的场景,其中一组相互依赖的coroutine flows相互依赖并链接:

viewModelScope.launch {
            repository.cacheAccount(person)
                .flatMapConcat { it->
                    Log.d(App.TAG, "[2] create account call (server)")
                    repository.createAccount(person)
                }
                .flatMapConcat { it ->
                    if (it is Response.Data) {
                        repository.cacheAccount(it.data)
                            .collect { it ->
                                // no op, just execute the command
                                Log.d(App.TAG, "account has been cached")
                            }
                    }
                    flow {
                        emit(it)
                    }
                }
                .catch { e ->
                    Log.d(App.TAG, "[3] get an exception in catch block")
                    Log.e(App.TAG, "Got an exception during network call", e)
                    state.update { state ->
                        val errors = state.errors + getErrorMessage(PersonRepository.Response.Error.Exception(e))
                        state.copy(errors = errors, isLoading = false)
                    }
                }
                .collect { it ->
                    Log.d(App.TAG, "[4] collect the result")
                    updateStateProfile(it)
                }
        }

1.在本地磁盘上缓存帐户
1.在后端创建帐户
1.在肯定的情况下,将新创建的帐户缓存在本地磁盘中
现在我必须添加更多的调用到一个新的API端点,场景变得更加复杂。
4a.在肯定的场景中,放入本地磁盘(高速缓存)启动的事务cacheRepository.createChainTx()
4 b.在否定的场景中,只是从后端进一步发出响应
4a.-〉5.在第二个端点repository.registerUser()上注册用户
1.第二个端点的响应通过更新现有行放入该高速缓存。即使是除异常以外的负情况也应该缓存以更新tx的状态。

viewModelScope.launch {
            lateinit var newTx: ITransaction
            cacheRepository.createChainTxAsFlow(RegisterUserTransaction(userWalletAddress = userWalletAddress))
                .map { it ->
                    newTx= it
                    repository.registerUserOnSwapMarket(userWalletAddress)
                }
                .onEach { it -> preProcessResponse(it, newTx) }
                .flowOn(backgroundDispatcher)
                .collect { it -> processResponse(it) }
        }

这是一个应该集成到第一个Flow chain中的场景。
问题是我不知道如何在Flow chain中做清楚。我可以在没有链接的情况下重写代码,但它也带来了各种if else语句。
你将如何以“人类可读”的方式来完成这个场景?

yfjy0ee7

yfjy0ee71#

我将结束了这个代码的过渡期:

viewModelScope.launch(backgroundDispatcher) {
            try {
                var cachedPersonProfile = repository.cacheAccount(person)
                var createAccountResponse = repository.createAccount(person)
                when(createAccountResponse) {
                    is Response.Data -> {
                        repository.cacheAccount(createAccountResponse.data)
                        val cachedTx = cacheRepository.createChainTx(RegisterUserTransaction(userWalletAddress = person.userWalletAddress))
                        val chainTx = walletRepository.registerUserOnSwapMarket(userWalletAddress = person.userWalletAddress)
                        when(chainTx) {
                            is ru.home.swap.core.network.Response.Data -> {
                                if (chainTx.data.isStatusOK()) {
                                    cachedTx.status = TxStatus.TX_MINED
                                } else {
                                    cachedTx.status = TxStatus.TX_REVERTED
                                }
                            }
                            is ru.home.swap.core.network.Response.Error.Message -> {
                                cachedTx.status = TxStatus.TX_EXCEPTION
                            }
                            is ru.home.swap.core.network.Response.Error.Exception -> {
                                cachedTx.status = TxStatus.TX_EXCEPTION
                            }
                        }
                        cacheRepository.createChainTx(cachedTx)
                        withContext(Dispatchers.Main) {
                            state.update { state ->
                                if (cachedTx.status == TxStatus.TX_MINED) {
                                    state.copy(
                                        isLoading = false,
                                        profile = createAccountResponse.data,
                                        status = StateFlagV2.PROFILE
                                    )
                                } else {
                                    val txError = "Failed register the profile on chain with status ${TxStatus.TX_MINED}"
                                    state.copy(
                                        isLoading = false,
                                        errors = state.errors + txError
                                    )
                                }
                            }
                        }
                    }
                    else -> { updateStateProfile(createAccountResponse) }
                }
            } catch (ex: Exception) {
                withContext(Dispatchers.Main) {
                    state.update { state ->
                        val errors = state.errors + getErrorMessage(PersonRepository.Response.Error.Exception(ex))
                        state.copy(errors = errors, isLoading = false)
                    }
                }
            }
        }

如果你有更好的选择,请在帖子中分享作为答案。

相关问题