我正在努力改变 List<CompletableFuture<X>>
至 CompletableFuture<List<T>>
. 当您有许多异步任务并且需要获得所有这些任务的结果时,这非常有用。
如果他们中的任何一个失败了,那么最后的未来就失败了。我就是这样实现的:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
要运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
如果其中任何一个失败了,那么它就失败了。即使有一百万个期货,它也能如期产出。我的问题是:假设有超过5000个期货,如果其中任何一个失败了,我会得到一个 StackOverflowError
:
java.util.concurrent.completablefuture.internalcomplete(completablefuture)处的线程“pool-1-thread-2611”java.lang.StackOverflower出现异常。java:210)在java.util.concurrent.completablefuture$thencose.run(completablefuture。java:1487)在java.util.concurrent.completablefuture.postcomplete(completablefuture。java:193)在java.util.concurrent.completablefuture.internalcomplete(completablefuture。java:210)在java.util.concurrent.completablefuture$thencose.run(completablefuture。java:1487)
我做错什么了?
注:以上返回的future在任意future失效时立即失效。公认的答案也应该是这一点。
9条答案
按热度按时间4ngedf3f1#
javaslang有一个非常方便的
Future
应用程序编程接口。它还允许从未来的集合中创造未来的集合。看到了吗http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/future.html#sequence-java.lang.iterable语言-
fcy6dtqo2#
你的任务可以很容易地完成,比如,
tpgth1q73#
免责声明:这不会完全回答最初的问题。它将缺少“如果一个人失败了,那就全失败”的部分。但是,我无法回答实际的、更一般的问题,因为它是作为这个问题的副本关闭的:java8 completablefuture.allof(…)with collection or list。所以我在这里回答:
如何转换
List<CompletableFuture<V>>
至CompletableFuture<List<V>>
使用Java8的流api?摘要:请使用以下命令:
用法示例:
完整示例:
iyfamqjs4#
在completablefuture上使用thencombine的一个序列操作示例
如果你不介意使用第三方库的话,cyclops react(我是作者)有一套用于completablefutures(以及optionals、streams等)的实用方法
pdsfdshx5#
你可以得到spotify的
CompletableFutures
图书馆与使用allAsList
方法。我想它的灵感来自GuavaFutures.allAsList
方法。如果您不想使用库,这里有一个简单的实现:
xriantvc6#
使用
CompletableFuture.allOf(...)
:关于您的实现的一些评论:
你使用
.thenComposeAsync
,.thenApplyAsync
以及.thenCombineAsync
很可能做不到你所期望的。这些...Async
方法在单独的线程中运行提供给它们的函数。因此,在您的示例中,您正在使添加到列表中的新项在提供的执行器中运行。无需将轻量级操作填充到缓存的线程执行器中。不要使用thenXXXXAsync
没有充分理由的方法。另外,
reduce
不应用于积累到可变容器中。即使当流是连续的时它可以正常工作,但是如果流是并行的,它也会失败。要执行可变缩减,请使用.collect
相反。如果要在第一次失败后立即完成整个计算,请在
sequence
方法:此外,如果要在第一次失败时取消其余操作,请添加
exec.shutdownNow();
刚好在…之后result.completeExceptionally(ex);
. 当然,这是假设exec
只存在于这一个计算中。如果没有,你将不得不循环并取消每一个剩余的Future
个别地。ezykj2lf7#
除了spotify futures库,您还可以尝试我的代码,请点击此处:https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/completionstages.java (与同一包中的其他类有依赖关系)
它实现了一个逻辑,用一个策略返回“至少n个(共m个)”completionstage-s,其中包含允许容忍的错误数量。对于所有/任何情况都有方便的方法,另外还有针对剩余未来的取消策略,以及处理completionstage-s(接口)而不是completablefuture(具体类)的代码。
4ktjp1zp8#
加上@misha接受的答案,可以进一步扩展为收集器:
现在您可以:
ddhy6vgd9#
正如米莎所指出的,你过度使用了
…Async
操作。此外,您正在构建一个复杂的操作链,对一个不反映您的程序逻辑的依赖项进行建模:创建一个作业x,它取决于列表中的第一个和第二个作业
创建一个job x+1,它取决于job x和列表中的第三个job
创建一个job x+2,它取决于job x+1和列表中的第4个job
…
创建一个作业x+5000,它取决于作业x+4999和列表中的最后一个作业
然后,取消(显式地或由于异常)这个递归组合的作业可能会递归地执行,并且可能会失败
StackOverflowError
. 这取决于实现。正如米莎所说,有一种方法,
allOf
它允许你对你的初衷进行建模,定义一个依赖于你列表中所有作业的作业。然而,值得注意的是,即使这样也没有必要。因为您使用的是一个无限线程池执行器,所以您只需发布一个异步作业,将结果收集到一个列表中即可。不管怎样,要求每个作业的结果意味着等待完成。
当线程数量有限且作业可能产生额外的异步作业时,使用组合依赖操作的方法非常重要,以避免等待作业从必须首先完成的作业中窃取线程,但这里的情况都不是这样。
在这种特定情况下,一个作业简单地迭代大量的先决条件作业并在必要时等待,可能比建模大量依赖项并让每个作业通知依赖作业完成情况更有效。