我正在努力实现这样的目标:
我正在尝试使用Flow.fromGraph
创建此流
- 我可以使用
Zip[B, C]
执行join
,它接收2个流 - 我可以用两种方法来执行
split
: - 使用
Broadcast[A](2)
- 使用
UnZip[(A,A)]
,前面是.map(a -> (a, a))
map(f1)
和map(f2)
都是我从包含的库中获得的自定义流,所以我不能真正修改它们,所以请不要说.map(a => (f1(a), f2(a)))
这两种情况有什么区别,或者是等价的?我发现唯一不同的是Broadcast
的取消能力,只有当 * 所有 * 下游取消(eagerCancel = false
)时,这是它的默认行为,而UnZip
(它做广播对eagerCancel = true
做的事情)
另外,如果两个路径中的任何一个发生故障,会发生什么?即,如果对于特定元素,f1引发错误,会产生什么影响?
另外,假设我们没有f2
函数(因此没有map操作),并且我们希望在最后发出(b,a)
,那么f2应该被一个标识流替换,还是可以一起跳过?(如果是后者,你会 * 永远 * 使用标识流吗?)
val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)
split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
这可能有助于内部缓冲/背压?
1条答案
按热度按时间mrphzbgm1#
它们都是 * 扇出 * 运算符;然而,
Unzip
来自文档:获取两个元素元组的流,并将这两个元素解压缩到两个不同的下游。
而
Broadcast
发出n个输出中的每个传入元素。
因此,我们可以得出结论,Unzip只是一个具有
n = 2
的广播;但重要的是如果元素是元组,Broadcast将只创建同一元组的n
输出。Unzip将为元素A
和B
各创建2个输出