akka 流:解压缩和广播之间有什么区别?

icomxhvb  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(135)

我正在努力实现这样的目标:

我正在尝试使用Flow.fromGraph创建此流

  • 我可以使用Zip[B, C]执行join,它接收2个流
  • 我可以用两种方法来执行split
  • 使用Broadcast[A](2)
  • 使用UnZip[(A,A)],前面是.map(a -> (a, a))

map(f1)map(f2)都是我从包含的库中获得的自定义流,所以我不能真正修改它们,所以请不要说.map(a => (f1(a), f2(a)))
这两种情况有什么区别,或者是等价的?我发现唯一不同的是Broadcast的取消能力,只有当 * 所有 * 下游取消(eagerCancel = false)时,这是它的默认行为,而UnZip(它做广播对eagerCancel = true做的事情)
另外,如果两个路径中的任何一个发生故障,会发生什么?即,如果对于特定元素,f1引发错误,会产生什么影响?
另外,假设我们没有f2函数(因此没有map操作),并且我们希望在最后发出(b,a),那么f2应该被一个标识流替换,还是可以一起跳过?(如果是后者,你会 * 永远 * 使用标识流吗?)

val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)

split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?

这可能有助于内部缓冲/背压?

mrphzbgm

mrphzbgm1#

它们都是 * 扇出 * 运算符;然而,
Unzip来自文档:
获取两个元素元组的流,并将这两个元素解压缩到两个不同的下游。
Broadcast
发出n个输出中的每个传入元素。
因此,我们可以得出结论,Unzip只是一个具有n = 2的广播;但重要的是如果元素是元组,Broadcast将只创建同一元组的n输出。Unzip将为元素AB各创建2个输出

相关问题