我有这个:
Stream<CompletableFuture<List<Item>>>
我怎么把它转换成
Stream<CompletableFuture<Item>>
其中:第二流由第一流中的每个列表内的每个和所有项目组成。
我研究了thenCompose
,但它解决了一个完全不同的问题,也被称为“扁平化”。
如何以流的方式有效地完成这一点,而不会阻塞或过早地消耗比必要的更多的流项目?
以下是我迄今为止最好的尝试:
ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;
@SuppressWarnings("unchecked")
CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
for(CompletableFuture<List<IncomingItem>> item: allFutures) {
queue.submit(item::get);
}
List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
queue.submit(() -> THE_END);
return THE_END;
});
queue.submit(() -> ender.get());
Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
boolean checkNext = true;
List<IncomingItem> next = null;
@Override
public boolean hasNext() {
if(checkNext) {
try {
next = queue.take().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
checkNext = false;
}
if(next == THE_END || next == null) {
return false;
}
else {
return true;
}
}
@Override
public List<IncomingItem> next() {
if(checkNext) {
hasNext();
}
if(!hasNext()) {
throw new IllegalStateException();
}
checkNext = true;
return next;
}
};
Stream<IncomingItem> flat = StreamSupport.stream(iter.spliterator(), false).flatMap(List::stream);
这在第一次工作,不幸的是,它有一个致命的bug:结果流似乎在检索所有项目之前过早地终止。
2023年更新
4年过去了,我仍然没有选择一个答案,因为目前两个答案都在说这是“不可能的”。听起来不太对劲我只是要求一种有效的方式来采取分批完成的项目,并能够单独监控其完成情况。也许这在Java 21中的虚拟线程中更容易实现?我不再维护这个代码库,但这个问题仍然是一个未解决的大问题。
更新#2:澄清假设CompletableFuture
* 表示根据https://www.geeksforgeeks.org/completablefuture-in-java/…有趣的是,JavaDocs不太清楚。
3条答案
按热度按时间y1aodyip1#
正如我在评论中所说,这是不可能的。
考虑一个任意的服务,它将返回一个
CompletableFuture<Integer>
:我现在可以毫无问题地将这个
CompletableFuture<Integer>
转换为Stream<CompletableFuture<List<Object>>>
:让我们假设有一种通用的方法可以将
Stream<CompletableFuture<List<T>>>
转换为Stream<CompletableFuture<T>>
:然后我可以执行以下操作:
等等,什么?
我现在可以从
CompletableFuture
中得到任意整数。这意味着,通过一些工程努力,我可以从CompletableFuture中获得所有信息-毕竟,内存只是一些数字。因此,我们必须得出结论,像
magic
这样的方法不可能存在,而不会违反时间结构。答案是:没有这样的方法,因为它不可能存在。
t8e9dugd2#
同意Johannes Kuhn的观点。当Futures仍在执行时,您无法知道它的状态,因此无法从Stream<CompletableFuture >转换为Stream。
虽然流的输出可以使用以下代码合并-
java Stream<CompletableFuture<List<Item>>>
到java List<Item>
或java List<CompletableFuture<List<AuditRecord>>>
至java List<Item>
v8wbuo2f3#
我有这个:
Stream<CompletableFuture<List<Item>>>
我怎么把它转换成
Stream<CompletableFuture<Item>>
我研究了thenCompose,但它解决了一个完全不同的问题,也被称为“扁平化”。
如何以流的方式有效地完成这一点,而不会阻塞或过早地消耗比必要的更多的流项目?
这是完全可能的,但长话短说,它不会为您节省任何工作,它会以不给您带来任何好处的方式增加开销,并且实际上会使您的代码在长期运行中变慢。你这样做将一事无成。
原因很简单--
CompletableFuture
在push-down design上运行。这意味着,您尝试附加链接调用(thenApply
,thenCompose
等)或完整调用(get
,join
,complete
)的 INSTANT,CompletableFuture
将立即开始计算。当您将一个后续函数链接到CompletableFuture
时,可能更容易将其视为急切评估。不是每个后续函数都会触发计算,但所有有用的函数都会触发计算。这意味着,当你试图触摸(比喻)一个
CompletableFuture
时,你已经触发了它开始计算。而且这个计算是“必要的”,所以它仍然符合你的要求。我很乐意详细解释为什么你想在
CompletableFuture<Item>
上调用的函数要么会和从CompletableFuture<List<Item>>
上获取一样,要么会更昂贵,更慢,没有任何好处。请告诉我,然后我会修改这个答案,并解释为什么你想要的功能不会以任何有用的方式帮助你,除了你喜欢的形式,无论出于什么原因。无论如何,如果这真的是你想要的,那么这就是方法。
这是输出。
正如您所看到的,
Stream
“提前停止”,只做了查找所需的Item
所需的工作。如果它做了比需要更多的工作,我们将看到
COMPUTATION WAS PERFORMED
的所有54行,但我们只看到31行,因为这是找到我们的值所需的全部内容。剩下的批次仍然没有打开,因为我们找到了我们需要的。这将满足您的两个要求。这些是以下内容。
我有这个:
Stream<CompletableFuture<List<Item>>>
我怎么把它转换成
Stream<CompletableFuture<Item>>
如何以流的方式有效地完成这一点,而不会阻塞或过早地消耗比必要的更多的流项目?