我尝试在一个返回SharedFlow
的suspend函数上实现一个超时机制。我使用CoroutineScope(coroutineContext).launch { }
启动一个协程,但我有read,这不是一个好的做法。建议将function改为CoroutineScope
扩展。但我有必要暂停这个功能。如何为需要返回流的挂起函数实现超时机制?
下面是我的实现的简化版本:
class Client {
sealed interface State {
object Loading: State
data class Success(val message: String): State
object Error: State
}
interface MyCallListener {
suspend fun onResult(result: State)
}
val methodListeners: MutableMap<String, MyCallListener> = mutableMapOf()
suspend inline fun call(): SharedFlow<State> {
val id = randomUUID()
val flow = MutableSharedFlow<State>()
//onResult is called elsewhere. this way, it's possible to emit a result to the flow
methodListeners[id] = object : MyCallListener {
override suspend fun onResult(result: State) {
flow.emit(result)
}
}
flow.tryEmit(State.Loading)
send(methodMessage) //this is a suspend function which uses a suspend function of ktors websocket session
CoroutineScope(coroutineContext).launch {
var latestValue: State? = null
launch {
flow.collect {
latestValue = it
}
}
delay(5000)
if (latestValue is State.Loading) {
flow.emit(State.Error)
}
}
return flow
}
fun onReceiveMessage(id: String, message: String) {
methodListeners[id]?.onResult(State.Success(message))
}
}
我需要从另一个suspend函数调用该函数:
class MyUseCase(
val client: Client
) {
suspend fun foo() = client.call()
}
这目前是可行的,但由于不建议使用CoroutineScope(coroutineContext),我不能使其成为CoroutineScope扩展,有更好的方法吗?
编辑:这里有更多关于我想完成的事情的背景。客户端是一个WebSocket消息处理程序。我用一个id发送名为“方法”的WebSocket消息,并用这个id保存一个侦听器示例。当接收到具有相同id的结果消息时,我将获取具有该消息的侦听器示例,并调用Reconner.onResult。我实现了这个调用函数,这样我就可以通过流接收方法结果。但有时,服务器不会发送带有该id的结果消息。因此,该状态被卡为加载状态。我想为此实现一个超时机制。
2条答案
按热度按时间dw1jzc5e1#
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
尝试使用withTimeoutOrbit
voase2hg2#
我有点猜到你想干什么了。我认为SharedFlow应该是一个
val
属性,因为它是共享的。您不希望多次调用此函数会静默地终止它返回的先前流(您的代码每次都通过替换侦听器来执行此操作)。然后听众也不应该公开改变。这里的行为是,当您开始收集流时,它会发出用
updateState()
设置的状态,如果一个状态变得陈旧(5秒内没有更新),它会发出错误状态。