def normalizeToTotal(source: Source[Int, Any]): Source[Double, NotUsed] =
source.map(i => Option(i)) // map everything to Some...
.concat(Source.single(None)) // so we can use None to signal upstream completion
.statefulMapConcat { () =>
var elems: List[Int] = Nil
{ elem: Option[Int] =>
elem.foreach { e => elems = e :: elems } // only when not yet completed
if (elem.isEmpty) {
// upstream is completing (None is the last element)
val des = elems.map(_.toDouble)
val sum = des.sum
val toEmit = des.reverse.map(_ / sum)
elems = Nil // preserve our invariant even in death...
toEmit
} else {
// not yet completed, don't emit
Nil
}
}
}
1条答案
按热度按时间2nc8po8w1#
既然您说数据是有限的(暗示:上游源将完成),那么类似的东西(在scala中)将起作用。
免责声明:我心目中的编译器传递了这个。
需要注意的是,这将消耗与流中元素数量成比例的内存(由于在所有元素都已知之前不发射的要求):这不是流算法,而是实现到流api的批处理算法。
(再说一次,如果一个流可以被看作是一个小批量的流(我看到你了,spark…),那么批处理同样可以被看作是一个最常“干”的流)
还可以注意到
statefulMapConcat
stage(只要它保持不变)将与无限的流一起工作Option[Int]
s、 口译None
作为批处理结束时发出的指示器。它可能仍然有用concat(Source.single(None))
当然,如果将其修改为使用这样的流,则确保批处理终止。