更新协程1.3.0-RC
工作版本:
@FlowPreview
suspend fun streamTest(): Flow<String> = channelFlow {
listener.onSomeResult { result ->
if (!isClosedForSend) {
offer(result)
}
}
awaitClose {
listener.unsubscribe()
}
}
也可以查看Roman Elizarov撰写的这篇Medium文章:Callbacks and Kotlin Flows
原始问题
我有一个流发出多个字符串:
@FlowPreview
suspend fun streamTest(): Flow<String> = flowViaChannel { channel ->
listener.onSomeResult { result ->
if (!channel.isClosedForSend) {
channel.sendBlocking(result)
}
}
}
一段时间后,我想取消订阅流。目前我执行以下操作:
viewModelScope.launch {
beaconService.streamTest().collect {
Timber.i("stream value $it")
if(it == "someString")
// Here the coroutine gets canceled, but streamTest is still executed
this.cancel()
}
}
如果协程被取消了,流仍然在执行。只是没有订阅者在监听新的值。我如何取消订阅并停止stream
函数?
9条答案
按热度按时间dgtucam11#
解决方案不是取消流程,而是取消流程启动的范围。
**注意:**如果您希望收集器在取消
Job
时停止,则应在collect
之前调用cancellable()
.ryevplcw2#
您可以在Flow上使用
takeWhile
运算符。x6492ojm3#
对于那些愿意在协程范围内退订Flow的人来说,这种方法对我很有效:
t40tm48m4#
对于当前版本的协程/
Flows
(1.2.x),我现在没有一个好的解决方案。对于onCompletion
,当流停止时,你会得到通知,但是你在streamTest
函数之外,很难停止监听新的事件。在下一个版本的协程(1.3.x)中,这将变得非常简单。函数
flowViaChannel
被弃用,取而代之的是channelFlow
。该函数允许您等待流程关闭,并在此时执行一些操作,例如删除侦听器:1tuwyuhd5#
当流在couroutin范围内运行时,您可以从中获取一个作业以控制停止订阅。
k4ymrczo6#
根据@罗纳德的回答,当你需要让你的
Flow
再次发射时,这对测试非常有效。我们必须知道我们预计总共有多少排放
n
,然后我们可以使用index
来知道何时更新Flow
,这样我们就可以接收更多的排放。w80xi6nr7#
如果您只想取消其中的订阅,可以按如下方式操作:
o2g1uqev8#
为了完整起见,我们有一个新版本的可接受的答案,我们可以直接在流中使用
launchIn
方法,而不是显式地使用launch
协程构建器:sf6xfgos9#
Kotlin团队设计了两种方法:
正如@罗纳德在另一篇评论中指出的:
选项1:
takeWhile { //predicate }
predicate 为false时取消收集。将不收集最终值***。***
选项2:
transformWhile { //predicate }
predicate 为false时,收集该值,然后取消
https://github.com/Kotlin/kotlinx.coroutines/issues/2065