在Vert.x Web服务器中,我有一组Futures,每个Futures都可能失败或成功,并保持一个结果。我对每一个未来的结果(可能是结果)都感兴趣,这意味着我需要处理每个未来的结果。
我在想那个维尔特。x的CompositeFuture
是要走的路,这是我的代码片段:
List<Future> futures = dataProviders.stream()
.filter(dp -> dp.isActive(requester))
.map(DataProvider::getData)
.collect(Collectors.toList());
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
routingContext.response()
.end(ar.cause());
return;
}
CompositeFuture cf = ar.result();
JsonArray data = new JsonArray();
for(int i = 0; i < cf.size(); i++) {
if(cf.failed(i)) {
final JsonObject errorJson = new JsonObject();
errorJson.put("error", cf.cause(i).getMessage());
data.add(errorJson);
} else {
data.add(((Data) cf.resultAt(i)).toJson());
}
}
JsonObject res = new JsonObject()
.put("data", data);
routingContext.response()
.putHeader("Content-Type", "application/json")
.end(res.toString());
});
但是我会遇到以下问题:
- 使用
CompositeFuture.all(futures).onComplete()
,只要futures
中的任何Future失败,我就不会得到成功的Future的结果(因为那时ar.result()
为null)。 - 使用
CompositeFuture.any(futures).onComplete()
,我将获得所有结果,但CompositeFuture在futures
的所有Future完成之前完成。也就是说,它不会等待每个未来完成,而是在任何未来完成时完成。(-〉cf.resultAt(i)
返回null) - 使用
CompositeFuture.join(futures).onComplete()
与使用all()
相同:只要任何Future失败,ar.result()
就为null。
什么是正确/最好的方法来等待一个Futures列表完成,同时能够单独处理每个结果和结果?
4条答案
按热度按时间xoefb8l81#
最简单的是如果你自己处理结果。您可以将
onSuccess
处理程序注册到您的future。这样,结果将被放入某种列表中,e。g.JsonArray
。您还可以查看
rx-java
库。这样的用例通常使用它可以更好地实现。kq0g1dla2#
我会使用join操作而不是all,所以CompositeFuture将等待所有future完成,即使其中一个失败。否则,可能会发生与CompositeFuture的求值绑定将在所有异步调用完成之前发生。
然后,您可以在CompositeFuture中加入futures之前定义恢复步骤。
它将打印:
qhhrdooz3#
使用
all
时,您可以简单地戳原始future以获得其结果:px9o7tmv4#
将成功存储到自己的数据存储中的问题是,数据存储需要是线程安全的,以防止竞争条件。因此,仅仅使用Java List或JsonArray并不是一个健壮的解决方案。解决这个问题的最简单和最少干扰的方法是在你的future周围创建一个 Package 器Future,它总是“成功”的,但是如果它“失败”,则用Throwable cause完成。通过这种方式,您可以按预期使用Vertx库的其余部分。
示例: