kotlin 如何实现挂起函数的超时机制

oxalkeyp  于 2023-10-23  发布在  Kotlin
关注(0)|答案(2)|浏览(134)

我尝试在一个返回SharedFlow的suspend函数上实现一个超时机制。我使用CoroutineScope(coroutineContext).launch { }启动一个协程,但我有read,这不是一个好的做法。建议将function改为CoroutineScope扩展。但我有必要暂停这个功能。如何为需要返回流的挂起函数实现超时机制?
下面是我的实现的简化版本:

class Client {
    sealed interface State {
        object Loading: State
        data class Success(val message: String): State
        object Error: State
    }

    interface MyCallListener {
        suspend fun onResult(result: State)
    }

    val methodListeners: MutableMap<String, MyCallListener> = mutableMapOf()

    suspend inline fun call(): SharedFlow<State> {
        val id = randomUUID()
        val flow = MutableSharedFlow<State>()

        //onResult is called elsewhere. this way, it's possible to emit a result to the flow
        methodListeners[id] = object : MyCallListener {
            override suspend fun onResult(result: State) {
                flow.emit(result)
            }
        }

        flow.tryEmit(State.Loading)

        send(methodMessage) //this is a suspend function which uses a suspend function of ktors websocket session

        CoroutineScope(coroutineContext).launch {
            var latestValue: State? = null
            launch {
                flow.collect {
                    latestValue = it
                }
            }
            delay(5000)
            if (latestValue is State.Loading) {
                flow.emit(State.Error)
            }
        }

        return flow
    }

    fun onReceiveMessage(id: String, message: String) {
        methodListeners[id]?.onResult(State.Success(message))
    }

}

我需要从另一个suspend函数调用该函数:

class MyUseCase(
    val client: Client
) {
    suspend fun foo() = client.call()
}

这目前是可行的,但由于不建议使用CoroutineScope(coroutineContext),我不能使其成为CoroutineScope扩展,有更好的方法吗?
编辑:这里有更多关于我想完成的事情的背景。客户端是一个WebSocket消息处理程序。我用一个id发送名为“方法”的WebSocket消息,并用这个id保存一个侦听器示例。当接收到具有相同id的结果消息时,我将获取具有该消息的侦听器示例,并调用Reconner.onResult。我实现了这个调用函数,这样我就可以通过流接收方法结果。但有时,服务器不会发送带有该id的结果消息。因此,该状态被卡为加载状态。我想为此实现一个超时机制。

dw1jzc5e

dw1jzc5e1#

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
尝试使用withTimeoutOrbit

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

    class Client {
        sealed interface State {
            object Loading: State
            object Success: State
            object Error: State
        }
    
        interface MyCallListener {
            suspend fun onResult(result: State)
        }
    
        var methodListener: MyCallListener? = null
    
        suspend fun call(): SharedFlow<State> {
            val flow = MutableSharedFlow<State>()
    
            // onResult is called elsewhere. This way, it's possible to emit a result to the flow
            methodListener = object : MyCallListener {
                override suspend fun onResult(result: State) {
                    flow.emit(result)
                }
            }
    
            flow.tryEmit(State.Loading)
    
            try {
                val latestValue = withTimeoutOrNull(5000) {
                    flow.firstOrNull { it != State.Loading }
                }
    
                if (latestValue == null) {
                    flow.emit(State.Error)
                }
            } catch (e: TimeoutCancellationException) {
                flow.emit(State.Error)
            }
    
            return flow
        }
    }
voase2hg

voase2hg2#

我有点猜到你想干什么了。我认为SharedFlow应该是一个val属性,因为它是共享的。您不希望多次调用此函数会静默地终止它返回的先前流(您的代码每次都通过替换侦听器来执行此操作)。然后听众也不应该公开改变。

private val backingCallStateFlow = MutableSharedFlow<State>(replay = 1)
    .apply { tryEmit(State.Loading) }

// use this to emit new State instead of the listener
fun updateCallState(state: State) {
    backingCallStateFlow.tryEmit(state)
}
    
val callStateFlow = methodListenerFlow.transformLatest {
    emit(it)
    delay(5000L)
    emit(State.Error)
}
// you can tag this with shareIn if appropriate

这里的行为是,当您开始收集流时,它会发出用updateState()设置的状态,如果一个状态变得陈旧(5秒内没有更新),它会发出错误状态。

相关问题