将多个Kotlin流合并到一个列表中,而无需等待第一个值

41zrol4v  于 2022-12-13  发布在  Kotlin
关注(0)|答案(3)|浏览(231)

我有一个List<Flow<T>>,想生成一个Flow<List<T>>。这几乎就是combine所做的--除了合并等待每个Flow发出一个初始值,这不是我想要的。以下面的代码为例:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}

对于combine(因此按原样),输出如下:

[a2, b1, c]
[a2, b2, c]

而我对所有的中间步骤也很感兴趣。这就是我想从这三个流程中得到的:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

现在我有两个解决方法,但是没有一个是好的...第一个是非常难看的,并且不适用于可空类型:

val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}

通过强制所有流发出第一个不相关的值,combine转换器实际上被调用了,并且让我删除了我知道不是实际值的空值。

sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}

现在这个方法工作得很好,但是仍然感觉我做得太多了。在协程库中是否有我缺少的方法?

iqih9akk

iqih9akk1#

不如这样吧:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

它解决了几个问题:

  • 无需引入新类型
  • []不在结果流中
  • 从调用点抽象出空值处理(或者不管它是如何解决的),产生的Flow自己处理它

因此,您不会注意到任何特定于实现的变通方法,因为您不必在收集期间处理它:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}

输出量:
[a0]
[a1]
[a1,b0]中的值
[a2,b0]中的值
[a2,b1]
[a2、b1、c]
[a2、b2、c]
Try it out here!

**编辑:**更新了答案以处理也发出空值的流。

  • 所使用的低级数组是线程安全的,就像处理单个变量一样。
c3frrgcw

c3frrgcw2#

我仍然希望避免Map到一个中间 Package 类型,正如有人在评论中提到的,这种行为有点不对(如果没有参数发出任何东西,那么一开始会发出一个空列表),但是这比我在写这个问题时想到的解决方案要好一些(仍然非常相似),并且可以处理可空类型:

inline fun <reified T> instantCombine(
  flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
  flow.map {
    @Suppress("USELESS_CAST") // Required for onStart(null)
    Holder(it) as Holder<T>?
  }
    .onStart { emit(null) }
}) {
  it.filterNotNull()
    .map { holder -> holder.value }
}

下面是一个通过此实现的测试套件:

class InstantCombineTest {
  @Test
  fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
    assertThat(instantCombine(emptyList<Flow<String>>()).toList())
      .isEmpty()
  }

  @Test
  fun `intermediate steps are emitted`() = runBlockingTest {
    val a = flow {
      delay(20)
      repeat(3) {
        emit("a$it")
        delay(100)
      }
    }
    val b = flow {
      repeat(3) {
        delay(150)
        emit("b$it")
      }
    }
    val c = flow {
      delay(400)
      emit("c")
    }

    assertThat(instantCombine(a, b, c).toList())
      .containsExactly(
        emptyList<String>(),
        listOf("a0"),
        listOf("a1"),
        listOf("a1", "b0"),
        listOf("a2", "b0"),
        listOf("a2", "b1"),
        listOf("a2", "b1", "c"),
        listOf("a2", "b2", "c")
      )
      .inOrder()
  }

  @Test
  fun `a single flow is mirrored`() = runBlockingTest {
    val a = flow {
      delay(20)
      repeat(3) {
        emit("a$it")
        delay(100)
      }
    }

    assertThat(instantCombine(a).toList())
      .containsExactly(
        emptyList<String>(),
        listOf("a0"),
        listOf("a1"),
        listOf("a2")
      )
      .inOrder()
  }

  @Test
  fun `null values are kept`() = runBlockingTest {
    val a = flow {
      emit("a")
      emit(null)
      emit("b")
    }

    assertThat(instantCombine(a).toList())
      .containsExactly(
        emptyList<String?>(),
        listOf("a"),
        listOf(null),
        listOf("b")
      )
      .inOrder()
  }
}
hzbexzde

hzbexzde3#

我想您可能在寻找.merge()

fun <T> Iterable<Flow<T>>.merge(): Flow<T>
fun <T> merge(vararg flows: Flow<T>): Flow<T>

将给定的流合并为单个流,而不保留元素的顺序。所有流都并发合并,对同时收集的流的数量没有限制。
默认的.merge()实现的工作方式如下

public fun <T> Iterable<Flow<T>>.merge(): Flow<T> =
  channelFlow {
    forEach { flow ->
      launch {
        flow.collect { send(it) }
      }
    }
  }

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html

相关问题