Kotlin流量:如何退订/停止

cfh9epnr  于 2022-12-13  发布在  Kotlin
关注(0)|答案(9)|浏览(286)

更新协程1.3.0-RC

工作版本:

@FlowPreview
suspend fun streamTest(): Flow<String> = channelFlow {
    listener.onSomeResult { result ->
        if (!isClosedForSend) {
            offer(result)
        }
    }

    awaitClose {
        listener.unsubscribe()
    }
}

也可以查看Roman Elizarov撰写的这篇Medium文章:Callbacks and Kotlin Flows

原始问题

我有一个流发出多个字符串:

@FlowPreview
suspend fun streamTest(): Flow<String> = flowViaChannel { channel ->
    listener.onSomeResult { result ->
            if (!channel.isClosedForSend) {
                channel.sendBlocking(result)
            }
    }
}

一段时间后,我想取消订阅流。目前我执行以下操作:

viewModelScope.launch {
    beaconService.streamTest().collect {
        Timber.i("stream value $it")
        if(it == "someString")
            // Here the coroutine gets canceled, but streamTest is still executed
            this.cancel() 
    }
}

如果协程被取消了,流仍然在执行。只是没有订阅者在监听新的值。我如何取消订阅并停止stream函数?

dgtucam1

dgtucam11#

解决方案不是取消流程,而是取消流程启动的范围。

val job = scope.launch { flow.cancellable().collect { } }
job.cancel()

**注意:**如果您希望收集器在取消Job时停止,则应在collect之前调用cancellable().

ryevplcw

ryevplcw2#

您可以在Flow上使用takeWhile运算符。

flow.takeWhile { it != "someString" }.collect { emittedValue ->
         //Do stuff until predicate is false  
       }
x6492ojm

x6492ojm3#

对于那些愿意在协程范围内退订Flow的人来说,这种方法对我很有效:

viewModelScope.launch {

        beaconService.streamTest().collect {

            //Do something then
            this.coroutineContext.job.cancel()

        }
}
t40tm48m

t40tm48m4#

对于当前版本的协程/Flows(1.2.x),我现在没有一个好的解决方案。对于onCompletion,当流停止时,你会得到通知,但是你在streamTest函数之外,很难停止监听新的事件。

beaconService.streamTest().onCompletion {

}.collect {
    ...
}

在下一个版本的协程(1.3.x)中,这将变得非常简单。函数flowViaChannel被弃用,取而代之的是channelFlow。该函数允许您等待流程关闭,并在此时执行一些操作,例如删除侦听器:

channelFlow<String> {
    println("Subscribe to listener")

    awaitClose {
        println("Unsubscribe from listener")
    }
}
1tuwyuhd

1tuwyuhd5#

当流在couroutin范围内运行时,您可以从中获取一个作业以控制停止订阅。

// Make member variable if you want.
var jobForCancel : Job? = null

// Begin collecting
jobForCancel = viewModelScope.launch {
    beaconService.streamTest().collect {
        Timber.i("stream value $it")
        if(it == "someString")
            // Here the coroutine gets canceled, but streamTest is still executed
            // this.cancel() // Don't
    }
}

// Call whenever to canceled
jobForCancel?.cancel()
k4ymrczo

k4ymrczo6#

根据@罗纳德的回答,当你需要让你的Flow再次发射时,这对测试非常有效。

val flow = MutableStateFlow(initialValue)

flow.take(n).collectIndexed { index, _ ->
    if (index == something) {
        flow.value = update
    }
}

//your assertions

我们必须知道我们预计总共有多少排放n,然后我们可以使用index来知道何时更新Flow,这样我们就可以接收更多的排放。

w80xi6nr

w80xi6nr7#

如果您只想取消其中的订阅,可以按如下方式操作:

viewModelScope.launch {
    testScope.collect {
        return@collect cancel()
    }
}
o2g1uqev

o2g1uqev8#

为了完整起见,我们有一个新版本的可接受的答案,我们可以直接在流中使用launchIn方法,而不是显式地使用launch协程构建器:

val job = flow.cancellable().launchIn(scope)
job.cancel()
sf6xfgos

sf6xfgos9#

Kotlin团队设计了两种方法:

正如@罗纳德在另一篇评论中指出的:

选项1:takeWhile { //predicate }

predicate 为false时取消收集。将不收集最终值***。***

flow.takeWhile { value ->
    value != "finalString" 
}.collect { value ->
    //Do stuff, but "finalString" will never hit this
}

选项2:transformWhile { //predicate }

predicate 为false时,收集该值,然后取消

flow.transformWhile { value ->
    emit(value)
    value != "finalString"
}.collect { value ->
    //Do stuff, but "finalString" will be the last value
}

https://github.com/Kotlin/kotlinx.coroutines/issues/2065

相关问题