Akka流flatMapConcat,在创建新源时停止以前的源

njthzxwz  于 2023-10-18  发布在  其他
关注(0)|答案(1)|浏览(151)

我有一个用例,其中我有一个提供服务“地址”的Source。然后,我想将该地址Map到从该服务流式传输数据的源。现在,如果我将获得一个新的地址,我想为该地址创建一个新的源,并继续从第二个源流,同时停止前一个源(因为没有保证最后一个地址/服务/源会发生什么)。
到目前为止,我发现flatMapConcat是最接近我所需要的,但我想停止以前的来源,并保留最新的。
在某种程度上,我想有:

AddressSource
  .flatMatLatest(address => StreamingSource.from(address))
  // at this point we should be receiving elements produced by the latest StreamingSource
  .to(Sink...)
cgvd09ve

cgvd09ve1#

诚然,这只是心理上的编译,但像这样的东西应该工作:

sourceOfAddresses
  .statefulMap(() => Option.empty[KillSwitch])(
    { (lastEmittedKillswitch, address) =>
      lastEmittedKillswitch.foreach(_.shutdown)

      // this just happens to match the desired ordering of the tuple, woohoo!
      buildSourceForAddress(address)
        .viaMat(Killswitches.single)(Keep.right)
        .preMaterialize()  // will need the materializer in implicit scope for this to work
    },
    _ => None
  )
  .detach // in combination with the shutdown above, ensure perpetual demand
  .flatMapConcat(identity)

基本上:

  • 用一个XNUMX开关构建您传递给flatMapConcat的每个源,并预物质化该源,以便您可以访问XNUMX开关
  • 在传递到flatMapConcat之前,保存该交换机
  • 当你得到一个新的地址,如果有一个保存的交换机,激活它

触发开关将完成发出的源,导致flatMapConcat请求另一个源,该源在detach(基本上是一个缓冲区1)中随时可用(因为它是在触发触发开关后立即发出的)。由于detach现在是空的,因此通过statefulMapsourceOfAddresses发送需求信号。

相关问题