Java 8并行运行多个方法

z5btuh9x  于 2023-01-24  发布在  Java
关注(0)|答案(4)|浏览(182)

我想同时运行两个具有不同返回类型的方法。下面是我的代码:

public void method(int id) {
    final CompletableFuture<List<FooA>> fooACF = CompletableFuture.supplyAsync(() -> generateFooA(id));
    final CompletableFuture<List<FooB>> fooBCF = CompletableFuture.supplyAsync(() -> generateFooB(id));
    List<FooA> fooAs = fooACF.get();
    List<FooB> fooBs = fooBCF.get();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}

但我不知道这两个方法是否会与上面的代码并行运行,或者我最好还是说:

List<FooA> fooAs = generateFooA(id);
List<FooB> fooBs = generateFooB(id);

如何正确地使用可完成future来并行运行这两种方法?

w1e3prcc

w1e3prcc1#

您的代码使用ForkJoinPool.commonPool()提供的线程运行良好,正如JavaDoc for CompletableFuture.supplyAsync(Supplier<U> supplier)所承诺的那样。您可以通过添加一些sleep()println()语句来快速地证明这一点。我使用String而不是List<Foo>来稍微简化您的代码:

public void method(int id) throws InterruptedException, ExecutionException {
    CompletableFuture<String> cfa = CompletableFuture.supplyAsync(() -> generateA(id));
    CompletableFuture<String> cfb = CompletableFuture.supplyAsync(() -> generateB(id));
    String fooA = cfa.get();
    String fooB = cfb.get();
    System.out.println("Final fooA " + fooA);
    System.out.println("Final fooB " + fooB);
}

public String generateA(int id) {
    System.out.println("Entering generateA " + Thread.currentThread());
    sleep(2000);
    System.out.println("Leaving generateA");
    return "A" + id;
}

public String generateB(int id) {
    System.out.println("Entering generateB " + Thread.currentThread());
    sleep(1000);
    System.out.println("Leaving generateB");
    return "B" + id;
}

private void sleep(int n) {
    try {
        Thread.sleep(n);
    } catch (InterruptedException ex) {
        // never mind
    }
}

输出为:

Entering generateFooA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Entering generateFooB Thread[ForkJoinPool.commonPool-worker-2,5,main]
Leaving generateFooB
Leaving generateFooA
Final fooA A1
Final fooB B1

您可以手动观察到"离开"输出行在1秒和2秒后出现。为了获得更多证据,您可以向输出添加时间戳。如果更改睡眠的相对长度,您将看到"离开"输出以不同的顺序出现。
如果忽略sleep() s,那么第一个线程很可能会很快完成,以至于在第二个线程开始之前就完成了:

Entering generateA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateA
Entering generateB Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateB
Final fooA A1
Final fooB B1

请注意,这一切发生得如此之快,以至于在运行时请求第二个线程时,该线程已经返回到池中,因此原始线程被重用。
这也可能发生在很短的睡眠中,尽管在我的系统上,每次运行它时1ms的睡眠就足够了。当然,sleep()是一个占位符,代表一个需要时间才能完成的"真正的"操作。如果你的真正操作是如此便宜,以至于它在其他线程启动之前就完成了,这是一个很好的暗示,表明在这种情况下多线程是没有好处的。

    • 然而,**如果你需要问如何证明事情是并发发生的,我想知道你为什么要它们首先并发发生。如果当你的程序并发地或顺序地执行这些任务时,在你的程序之间没有"现实世界"可观察到的差异,那么为什么不让它顺序地运行呢?顺序操作更容易推理;有许多与并发性相关的隐蔽的错误。

也许你希望通过多线程来提高速度--如果是这样的话,速度的提高应该是你所衡量的,而不是事情是否实际上是并发的。记住,对于大量的任务,CPU不能并行地比顺序地执行它们更快。

xxe27gdn

xxe27gdn2#

你可以用Java 8+和流API来实现。
例如,我们有一个计算流程如下:
结果=(a + B)+(a-c)+(c * b).
因此,我们将此计算分为以下几种方法:

public class Calculator {

    public static int add(int a, int b) {
        sleep(); //Imagine this calculation take several seconds
        return a + b;
    }

    public static int minus(int a, int b) {
        sleep(); //Imagine this calculation take several seconds
        return a - b;
    }

    public static int divide(int a, int b) {
        sleep(); //Imagine this calculation take several seconds
        return a * b;
    }

    private static void sleep() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

**注意:**您可以看到,对于每种方法,计算出的过程持续时间为2秒。
Lagacy代码样式:

int a = 12;
  int b = 4;
  int c = 1;
  LocalTime startDateTime = LocalTime.now();
  int legacyCalculate = Calculator.add(a, b) + Calculator.minus(a, c) + Calculator.divide(c, b);
  System.out.println("Result : " + legacyCalculate);
  LocalTime endDateTime = LocalTime.now();

  System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");

  // Result : 
  Result : 31
  Process time : 6 seconds

使用(ExecutorService或Thread)的并行计算:

ExecutorService es = Executors.newCachedThreadPool();   
LocalTime startDateTime = LocalTime.now();
Future<Integer> r1 = es.submit(() -> Calculator.add(a, b));
Future<Integer> r2 = es.submit(() -> Calculator.minus(a, c));
Future<Integer> r3 = es.submit(() -> Calculator.divide(c, b));
System.out.println("Result : " + (r1.get() + r2.get() + r3.get()));
LocalTime endDateTime = LocalTime.now();
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");
es.shutdown();     

// Result: 
Result : 31
Process time : 2 seconds

使用Java8+流进行并行计算:

Supplier<Integer> r1 = () -> Calculator.add(a, b);
Supplier<Integer> r2 = () -> Calculator.minus(a, c);
Supplier<Integer> r3 = () -> Calculator.divide(c, b);

LocalTime startDateTime = LocalTime.now();
int result = Stream.of(r1, r2, r3) 
        .parallel() // Please pay attention to this line
        .mapToInt(Supplier::get) 
        .sum();
LocalTime endDateTime = LocalTime.now();
System.out.println("Result : " + result);
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");

// Result: 
Result : 31
Process time : 2 seconds
vql8enpb

vql8enpb3#

您缺少一个Executor

ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> = Stream.<Runnable>of(() -> generateFooA(id), () -> generateFooA(id))
        .map(executor::submit)
        .collect(Collectors.toList());
for (Future<?> future : futures) {
    future.get(); // do whatever you need here
}

Runnables在你submit它们的时候开始执行。get()尽快返回。例如,如果你get()的第一个未来调用是最慢的,所有其他的get()调用将立即返回。

58wvjzkj

58wvjzkj4#

正如我在我的评论中所说,看看How to start two threads at "exactly" the same time,但这应该是你要找的

final CyclicBarrier gate = new CyclicBarrier(3);
public void method(int id) {
    Thread one = new Thread (()->{
        gate.await();
        List<FooA> fooAs = generateFooA(id);
    });
    Thread two = new Thread (()->{
        gate.await();
        List<FooB> fooBs = generateFooB(id);
    });
    one.start();
    two.start();
    gate.await();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}

相关问题