我有一个需要一个方法处理的任务列表,这个过程需要很长时间,所以我希望列表中的任务以异步方式逐个处理,并以异步流的形式返回结果,所以下游处理不需要等待整个任务列表的完成:
AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}
在互联网上进行了简短的搜索之后,我发现rxjava可以处理异步流数据,但是介绍似乎解释了如何创建异步数据流。那么如何在java中创建一个异步生产者/源呢?
1条答案
按热度按时间jjhzyzn01#
您可以创建异步
Observable
它将在给定任务的计算完成后立即发出值。你需要flatMap
接线员。在一个简化的例子中,这看起来像:将任务Map到
Observable
使用subscribeOn
因此,当有东西订阅它们时,订阅发生在不同的线程上。flatMap
操作员订阅所有这些Observables
并在准备就绪后立即释放值。计算是异步的,因为订阅发生在不同的线程中Scedulers.io
游泳池。