基于条件多路复用Akka源/流

ujv3wf0j  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(173)

是否有一种方法可以根据外部条件复用两个或更多Akka源或流?

def cond: Boolean = ???

val src1 = Source.fromIterator(i1)
val src2 = Source.fromIterator(i2)
val src3 = Source.mux(src1, src2, cond)

根据cond,结果src3应包含来自src1的项或来自src2的项,但不能同时包含两者。
我发现了似乎是相反的操作divertTo。同时,似乎没有一个扇入操作支持条件合并。

bakd9h0s

bakd9h0s1#

我的建议沿着:

def mux[T](a: Source[T, Any], b: Source[T, Any])(cond: Int => Boolean): Source[T, Any] = {
  a.map((1, _)).merge(b.map((2, _)))
    .filter(t => cond(t._1))
    .map(_._2)
}

简单地说,它为每个源发出的元素附加一个标识符(这里是1、2,但可以是任何值),然后使用提供的cond函数进行过滤,只保留来自当前选定源的元素,然后Map回该元素。
我认为zip不是一个好主意,因为它 “当所有输入都有一个元素可用时发出”,也就是说,即使源A中有一个元素可用,而您实际上想切换到源A,zip也会等到B中有一个元素可用时才发出(ref)。
另一方面,merge将在任何源具有可用项时立即发出。

qnakjoqk

qnakjoqk2#

我不确定这是否是您所需要的,但您可以尝试以下操作:

def cond: Boolean = Random.nextBoolean()

val src1 = Source.fromIterator(() => LazyList.from(1).iterator)
val src2 = Source.fromIterator(() => LazyList.from(-1, -1).iterator)
val src3 = src1.zip(src2).map(pair => if (cond) pair._1 else pair._2)

src3.runForeach(println)

在一个run in Scastie中,输出开始于:

1
-2
3
4
-5
-6
7
-8
-9
10
...

如您所见,在本例中,我在两个流之间随机选择。

相关问题