java SubscribeOn仅使用池中的1个线程

ih99xse1  于 2023-03-11  发布在  Java
关注(0)|答案(2)|浏览(177)

subscribeOn上有很多问题,理论上很简单:它使订阅者进程与特定的调度程序一起运行。但是,当我运行此命令时:

@Test
void fluxWithSubscribeOnTest() {
    final Scheduler s = Schedulers.newParallel("parallel", 10);
    final Publisher<Integer> integerFlux =
            Flux
                    .range(1, 1000)
                    .doOnNext(integer -> log.info("executed"))
                    .subscribeOn(s);
    StepVerifier.create(integerFlux)
            .expectNextCount(1000)
            .verifyComplete();
}

我看到所有1000个日志都在同一个并行线程中写入

18:39:47.612 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed

我希望上面的代码记录(所以执行onNext)subscribeOn提供的池中的多个线程。ParallelFlux就是这样工作的,正如预期的那样,它记录了不同的线程:

@Test
void parallelFluxTest() {
    final Scheduler s = Schedulers.newParallel("parallel", 10);
    final Publisher<Integer> integerFlux =
            Flux
                    .range(1, 1000)
                    .parallel(10)
                    .runOn(s)
                    .doOnNext(integer -> log.info("executed"));
    StepVerifier.create(integerFlux)
            .expectNextCount(1000)
            .verifyComplete();
}

18:43:42.377 [parallel-1] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-2] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-3] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-4] INFO com.performance.TestClass - executed
18:43:42.378 [parallel-5] INFO com.performance.TestClass - executed

有人能解释一下为什么subscribeOn示例只记录一个线程,而池大小是10吗?

ugmeyewa

ugmeyewa1#

我想您是因为使用Scheduler.parallel()Scheduler.newParallel()时使用了“并行”一词而感到困惑

这里的“并行”并不意味着默认情况下它会神奇地使您的处理并行。

这意味着调度程序具有固定的线程池,用于执行某些CPU密集型操作,适合并行工作。
建议使用此调度程序执行CPU密集型操作。
关于Flux.range():这个方法的目的是从start开始发出一个计数递增的整数序列。2在你的第一个例子中,它是一个顺序操作,它在你订阅通量的线程上发出数字。3除非你把parallel()runOn()一起使用,否则没有任何并行工作

r55awzrz

r55awzrz2#

React堆文件中指出了您所看到的行为,其中指出:
要获得ParallelFlux,可以在任何Flux上使用parallel()操作符。该方法本身不会并行化工作,而是将工作负载划分为“rails”(默认情况下,rails的数量与CPU内核的数量相同)。
为了告诉生成的ParallelFlux在哪里运行每个rail(并且,通过扩展,并行运行rail),您必须使用runOn(Scheduler)。注意,对于并行工作,建议使用专用的Scheduler:计划程序.parallel()。
所以你需要rails才能分割工作,而要把分割的工作分配到不同的核心上,你需要一个parallel scheduler
这意味着您的第一个示例由于调度程序而在多个核上有线程,但是Flux本身没有能力将工作分割并分配给不同的核。
基本上,第一个示例是在多个未使用的内核上分配线程。
请记住,I/O工作如日志记录、发出请求、阅读/写入文件,通常不会从并行工作中受益。您基本上是将工作放在多个核心上,这些核心仍然必须等待其他事情,如文件句柄被释放、其他服务响应等。
因此,你基本上是分配了大量的资源,很少的收益。
I/O工作更像是一种编排,而不是原始的计算能力,如果您需要执行繁重的cpu限制计算,则并行工作通常会大放异彩。
所以不是伐木。

相关问题