computablefuture并行调用

8ulbf1ek  于 2021-06-29  发布在  Java
关注(0)|答案(1)|浏览(332)

**结束。**此问题需要详细的调试信息。它目前不接受答案。
**想改进这个问题吗?**更新问题,使其成为堆栈溢出的主题。

14天前关门了。
改进这个问题
我想并行运行10个api。每个api都返回一些值。当值的总计数等于100时我想停止,也就是说,如果我在得到所有api的结果之前得到100个结果,我不想等待所有10个api。所以我想和你一起玩 CompletableFuture.anyOf() 在循环和返回中,但我无法找出相同的正确语法。如果还有其他有效的方法呢?
请回复。
提前谢谢!

k3bvogb1

k3bvogb11#

你可以使用倒计时锁

ExecutorService executor = Executors.newFixedThreadPool(10);

    List<Integer> results = new ArrayList<>();
    int maxResults = 100;
    CountDownLatch latch = new CountDownLatch(maxResults);

    for (int i = 0; i < 10; ++i) {
        int apiNumber = i;
        executor.execute(() -> {
            while (results.size() < maxResults) {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    System.out.println("API-" + apiNumber + " call");
                    int[] newValues = new Random().ints(0, 10).limit(new Random().nextInt(10)).toArray(); // API call
                    for (int value : newValues) {
                        synchronized (results) {
                            if (results.size() < maxResults) {
                                results.add(value);
                                latch.countDown();
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    latch.await();

    System.out.println("The results have been calculated (" + results.size() + ")");

    executor.shutdown();

或相位器

ExecutorService executor = Executors.newFixedThreadPool(10);

    List<Integer> results = new ArrayList<>();
    int maxResults = 100;
    Phaser phaser = new Phaser(1);
    phaser.register();

    for (int i = 0; i < 10; ++i) {
        int apiNumber = i;
        executor.execute(() -> {
            while (results.size() < maxResults) {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    System.out.println("API-" + apiNumber + " call");
                    int[] newValues = new Random().ints(0, 10).limit(new Random().nextInt(10)).toArray(); // API call
                    for (int value : newValues) {
                        synchronized (results) {
                            results.add(value);
                            if (results.size() >= maxResults) {
                                phaser.arrive();
                                break;
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    phaser.arriveAndAwaitAdvance();

    System.out.println("The results have been calculated (" + results.size() + ")");

    executor.shutdown();

相关问题