我有一个List<Flow<T>>
,想生成一个Flow<List<T>>
。这几乎就是combine
所做的--除了合并等待每个Flow
发出一个初始值,这不是我想要的。以下面的代码为例:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
对于combine
(因此按原样),输出如下:
[a2, b1, c]
[a2, b2, c]
而我对所有的中间步骤也很感兴趣。这就是我想从这三个流程中得到的:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
现在我有两个解决方法,但是没有一个是好的...第一个是非常难看的,并且不适用于可空类型:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
通过强制所有流发出第一个不相关的值,combine
转换器实际上被调用了,并且让我删除了我知道不是实际值的空值。
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
现在这个方法工作得很好,但是仍然感觉我做得太多了。在协程库中是否有我缺少的方法?
3条答案
按热度按时间iqih9akk1#
不如这样吧:
它解决了几个问题:
[]
不在结果流中因此,您不会注意到任何特定于实现的变通方法,因为您不必在收集期间处理它:
输出量:
[a0]
[a1]
[a1,b0]中的值
[a2,b0]中的值
[a2,b1]
[a2、b1、c]
[a2、b2、c]
Try it out here!
**编辑:**更新了答案以处理也发出空值的流。
c3frrgcw2#
我仍然希望避免Map到一个中间 Package 类型,正如有人在评论中提到的,这种行为有点不对(如果没有参数发出任何东西,那么一开始会发出一个空列表),但是这比我在写这个问题时想到的解决方案要好一些(仍然非常相似),并且可以处理可空类型:
下面是一个通过此实现的测试套件:
hzbexzde3#
我想您可能在寻找
.merge()
:将给定的流合并为单个流,而不保留元素的顺序。所有流都并发合并,对同时收集的流的数量没有限制。
默认的
.merge()
实现的工作方式如下https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html