我有一个用例,其中我有一个提供服务“地址”的Source。然后,我想将该地址Map到从该服务流式传输数据的源。现在,如果我将获得一个新的地址,我想为该地址创建一个新的源,并继续从第二个源流,同时停止前一个源(因为没有保证最后一个地址/服务/源会发生什么)。
到目前为止,我发现flatMapConcat
是最接近我所需要的,但我想停止以前的来源,并保留最新的。
在某种程度上,我想有:
AddressSource
.flatMatLatest(address => StreamingSource.from(address))
// at this point we should be receiving elements produced by the latest StreamingSource
.to(Sink...)
1条答案
按热度按时间cgvd09ve1#
诚然,这只是心理上的编译,但像这样的东西应该工作:
基本上:
flatMapConcat
的每个源,并预物质化该源,以便您可以访问XNUMX开关flatMapConcat
之前,保存该交换机触发开关将完成发出的源,导致
flatMapConcat
请求另一个源,该源在detach
(基本上是一个缓冲区1)中随时可用(因为它是在触发触发开关后立即发出的)。由于detach
现在是空的,因此通过statefulMap
向sourceOfAddresses
发送需求信号。