java Vertx CompositeFuture:完成所有期货

lbsnaicq  于 2023-04-28  发布在  Java
关注(0)|答案(4)|浏览(124)

在Vert.x Web服务器中,我有一组Futures,每个Futures都可能失败或成功,并保持一个结果。我对每一个未来的结果(可能是结果)都感兴趣,这意味着我需要处理每个未来的结果。
我在想那个维尔特。x的CompositeFuture是要走的路,这是我的代码片段:

List<Future> futures = dataProviders.stream()
    .filter(dp -> dp.isActive(requester))
    .map(DataProvider::getData)
    .collect(Collectors.toList());

CompositeFuture.all(futures)
        .onComplete(ar -> {
            if(ar.failed()) {
                routingContext.response()
                    .end(ar.cause());
                return;
            }
            
            CompositeFuture cf = ar.result();
            JsonArray data = new JsonArray();
            for(int i = 0; i < cf.size(); i++) {
                if(cf.failed(i)) {
                    final JsonObject errorJson = new JsonObject();
                    errorJson.put("error", cf.cause(i).getMessage());
                    data.add(errorJson);
                } else {
                    data.add(((Data) cf.resultAt(i)).toJson());
                }
            }

            JsonObject res = new JsonObject()
                .put("data", data);

            routingContext.response()
                    .putHeader("Content-Type", "application/json")
                    .end(res.toString());
        });

但是我会遇到以下问题:

  • 使用CompositeFuture.all(futures).onComplete(),只要futures中的任何Future失败,我就不会得到成功的Future的结果(因为那时ar.result()为null)。
  • 使用CompositeFuture.any(futures).onComplete(),我将获得所有结果,但CompositeFuture在futures的所有Future完成之前完成。也就是说,它不会等待每个未来完成,而是在任何未来完成时完成。(-〉cf.resultAt(i)返回null)
  • 使用CompositeFuture.join(futures).onComplete()与使用all()相同:只要任何Future失败,ar.result()就为null。

什么是正确/最好的方法来等待一个Futures列表完成,同时能够单独处理每个结果和结果?

xoefb8l8

xoefb8l81#

最简单的是如果你自己处理结果。您可以将onSuccess处理程序注册到您的future。这样,结果将被放入某种列表中,e。g. JsonArray

List<Future> futures = //...list of futures

JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));

CompositeFuture.all(futures)
    .onComplete(ar -> {
        if(ar.failed()) {
            // successful elements are present in "results"
            routingContext.response().end(results.encode());
            return;
        }
        //... rest of your code
     });

您还可以查看rx-java库。这样的用例通常使用它可以更好地实现。

kq0g1dla

kq0g1dla2#

我会使用join操作而不是all,所以CompositeFuture将等待所有future完成,即使其中一个失败。否则,可能会发生与CompositeFuture的求值绑定将在所有异步调用完成之前发生。
然后,您可以在CompositeFuture中加入futures之前定义恢复步骤。

public static void main(String[] args) {
    List<String> inputs = List.of("1", "Terrible error", "2");

    List<Future> futures = inputs.stream()
        .map(i -> asyncAction(i)
            .recover(thr -> {
              // real recovery or just logging
              System.out.println("Bad thing happen: " + thr.getMessage());
              return Future.succeededFuture();
            }))
        .collect(Collectors.toList());

    CompositeFuture.join(futures)
        .map(CompositeFuture::list)
        .map(results -> results.stream()
            // filter out empty recovered future            
            .filter(Objects::nonNull)
            .collect(Collectors.toList()))
        .onSuccess(System.out::println);
  }

  static Future<String> asyncAction(final String input) {
    if ("Terrible error".equals(input)) {
      return Future.failedFuture(input);
    }
    return Future.succeededFuture(input);
  }

它将打印:

Bad thing happen: Terrible error
[1, 2]
qhhrdooz

qhhrdooz3#

使用all时,您可以简单地戳原始future以获得其结果:

List<Future> futures = //...list of futures

CompositeFuture.all(futures).onComplete(ar -> {
  if(ar.succeeded()){
    futures.forEach(fut -> log.info( fut.succeded() +" / " +_fut.result() ));
  }
} );
px9o7tmv

px9o7tmv4#

将成功存储到自己的数据存储中的问题是,数据存储需要是线程安全的,以防止竞争条件。因此,仅仅使用Java List或JsonArray并不是一个健壮的解决方案。解决这个问题的最简单和最少干扰的方法是在你的future周围创建一个 Package 器Future,它总是“成功”的,但是如果它“失败”,则用Throwable cause完成。通过这种方式,您可以按预期使用Vertx库的其余部分。
示例:

public Future<Object> wrapFuture(Future<Object> future) {
  Promise<Object> promise = Promise.promise();
  future.onComplete(asyncResult -> {
    if (asyncResult.succeeded()) {
      promise.complete(asyncResult.result());
    } else {
      promise.complete(cause);
    }
  });
}

public void myFunction() {
  List<Future> futures = new ArrayList<Future>();
  for (operationReturningAFuture: operations) {
    futures.add(wrapFuture(operationReturningAFuture()));
  }
  ComposititeFuture.join(futures).onComplete(asyncResult -> {
    List<Object> output = asyncResult.result().list();
    for (Object response : output) {
      if (response instanceof Throwable) {
        // this future failed
      } else {
        // this future succeeded
      }
    }
  });
}

相关问题