Java ForkJoinPool中阻塞IO时线程的行为

fquxozlt  于 2023-05-21  发布在  Java
关注(0)|答案(3)|浏览(275)

如果ForkJoinPool中的线程执行阻塞I/O活动(下面使用Thread.sleep(10000)在代码中模拟),它应该选择另一个不会导致阻塞IO的任务。但是,在代码下面运行ForkJoinPool并行度为1或2的代码,结果与之不匹配。线程不会放弃阻塞任务(即task 1和task 2),并移动到非阻塞任务(task 3)。在池大小=1的情况下,所有任务都按顺序执行,而我希望Task 3首先完成,因为在这两种情况下(池大小=1或2)它都是非阻塞的
在阻塞IO的情况下,ForkJoinPool中的线程的实际行为是什么?

ForkJoinPool pool = new  ForkJoinPool(1);
    
    List<Future<String>> tasks = new ArrayList<Future<String>>();
    

    //Blocking
    tasks.add(pool.submit(() ->  {  Thread.sleep(10000); System.out.println("Task 1 woke up"+ Thread.currentThread().isDaemon()); return "task1"; }));
    //Blocking
    tasks.add(pool.submit(() ->  { Thread.sleep(10000); System.out.println("Task 2 woke up"+Thread.currentThread().isDaemon()); return "task2"; }));
    //Non Blocking
    tasks.add(pool.submit(() ->  {  System.out.println("Task 3 Runs"+Thread.currentThread().isDaemon()); return "task3"; }));
    

    
    int i=0;
    System.out.println("pool size = " + pool.getParallelism() + "Thread count=" +pool.getPoolSize() + "Stealing =" + pool.getStealCount());
    System.out.println("waiting");
    
    String str1 = tasks.get(0).get();
    String str2 = tasks.get(1).get();
    String str3 = tasks.get(2).get();
    
    System.out.println("Results = " +  str1+ str2 +str3);
    
    System.out.println("done");

结果:
//池大小= 2
任务2唤醒真
任务1唤醒真
任务3 Runstrue
//池大小= 1
任务1唤醒真
任务2唤醒真
任务3 Runstrue
另一个观察:
当我们首先提交Task 3(非阻塞)时,只有它首先被调用。

k7fdbhmy

k7fdbhmy1#

如果ForkJoinPool中的Thread执行阻塞I/O活动(下面使用Thread.sleep(10000)在代码中模拟),它应该选择另一个不会导致阻塞IO的任务。
这不是线程的行为方式。线程表示动态执行。它不能任意跳转到另一个Runnable。线程没有办法知道线程是否阻塞I/O或执行密集计算,* 除非 * 线程引擎和特定的I/O是为此设计的(例如:jdbc vs.r2dbc)。
换句话说,如果你使用Reactor/Spring Reactive/其他一些具有传统阻塞I/O的响应式编程框架,如JDBC,你的线程仍然会挂起等待I/O。
阻塞I/O通常在轮询套接字时实现为循环,特定I/O外部的任何东西都无法知道这实际上是阻塞I/O。
据我所知,ForkJoinPool不是React式的,它将Runnables交给线程,就是这样。

qgelzfjb

qgelzfjb2#

ForkJoinPool仅用作织机提案中提到的调度程序。只有虚拟线程能够放弃阻塞任务并切换到另一个任务。根据我在loom邮件列表中阅读到的一封邮件,loom团队目前不打算重新设计java.util.concurrent的ForkJoinPool来使用虚拟线程。

dgsult0t

dgsult0t3#

ForkJoinPool不能任意检测应该启动其他任务以保持并行性的情况,以及不应该启动其他任务的情况。
因此,正在等待某些东西的工作项必须显式地告诉ForkJoinPool它应该启动其他任务以保持并行性。
要实现这一点,您必须使用ForkJoinPool#managedBlock或任何使用它的API。
例如,如果您使用CompletableFuture#joinCompletableFuture#get,那么这些方法在后台使用ForkJoinPool#managedBlock,从而允许ForkJoinPool启动其他任务以保持预期的并行性。
简单的Thread.sleep不适用于此。
如果您在示例中将Thread.sleep(10000);替换为CompletableFuture.runAsync(() -> { try { Thread.sleep(10000); } catch (Exception e) {}; }).join();,那么您将获得预期的行为。

相关问题