如何通过akka流计算比率

ve7v8dk2  于 2021-06-29  发布在  Java
关注(0)|答案(1)|浏览(442)

查看如何计算流中每个元素的比率(百分比)。 (10,20,30,40,50)->(10/150, 20/150, 30/150, 40/150, 50/150) 150是一个流中元素的总和
图形应该将流缩减为一个元素,然后将该元素应用于流中的每个元素
我在考虑广播(2)流,然后在(1)做一个减少(计算总和),(2)应该是相同的,然后压缩它不知何故。zip是1:1组合的问题。

2nc8po8w

2nc8po8w1#

既然您说数据是有限的(暗示:上游源将完成),那么类似的东西(在scala中)将起作用。

def normalizeToTotal(source: Source[Int, Any]): Source[Double, NotUsed] =
  source.map(i => Option(i))  // map everything to Some...
    .concat(Source.single(None)) // so we can use None to signal upstream completion
    .statefulMapConcat { () =>
      var elems: List[Int] = Nil
      { elem: Option[Int] =>
        elem.foreach { e => elems = e :: elems }  // only when not yet completed
        if (elem.isEmpty) {
          // upstream is completing (None is the last element)
          val des = elems.map(_.toDouble)
          val sum = des.sum
          val toEmit = des.reverse.map(_ / sum)
          elems = Nil  // preserve our invariant even in death...
          toEmit
        } else {
          // not yet completed, don't emit
          Nil
        }
      }
    }

免责声明:我心目中的编译器传递了这个。
需要注意的是,这将消耗与流中元素数量成比例的内存(由于在所有元素都已知之前不发射的要求):这不是流算法,而是实现到流api的批处理算法。
(再说一次,如果一个流可以被看作是一个小批量的流(我看到你了,spark…),那么批处理同样可以被看作是一个最常“干”的流)
还可以注意到 statefulMapConcat stage(只要它保持不变)将与无限的流一起工作 Option[Int] s、 口译 None 作为批处理结束时发出的指示器。它可能仍然有用 concat(Source.single(None)) 当然,如果将其修改为使用这样的流,则确保批处理终止。

相关问题