subscribeOn上有很多问题,理论上很简单:它使订阅者进程与特定的调度程序一起运行。但是,当我运行此命令时:
@Test
void fluxWithSubscribeOnTest() {
final Scheduler s = Schedulers.newParallel("parallel", 10);
final Publisher<Integer> integerFlux =
Flux
.range(1, 1000)
.doOnNext(integer -> log.info("executed"))
.subscribeOn(s);
StepVerifier.create(integerFlux)
.expectNextCount(1000)
.verifyComplete();
}
我看到所有1000个日志都在同一个并行线程中写入
18:39:47.612 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
我希望上面的代码记录(所以执行onNext)subscribeOn提供的池中的多个线程。ParallelFlux就是这样工作的,正如预期的那样,它记录了不同的线程:
@Test
void parallelFluxTest() {
final Scheduler s = Schedulers.newParallel("parallel", 10);
final Publisher<Integer> integerFlux =
Flux
.range(1, 1000)
.parallel(10)
.runOn(s)
.doOnNext(integer -> log.info("executed"));
StepVerifier.create(integerFlux)
.expectNextCount(1000)
.verifyComplete();
}
18:43:42.377 [parallel-1] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-2] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-3] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-4] INFO com.performance.TestClass - executed
18:43:42.378 [parallel-5] INFO com.performance.TestClass - executed
有人能解释一下为什么subscribeOn示例只记录一个线程,而池大小是10吗?
2条答案
按热度按时间ugmeyewa1#
我想您是因为使用
Scheduler.parallel()
或Scheduler.newParallel()
时使用了“并行”一词而感到困惑这里的“并行”并不意味着默认情况下它会神奇地使您的处理并行。
这意味着调度程序具有固定的线程池,用于执行某些CPU密集型操作,适合并行工作。
建议使用此调度程序执行CPU密集型操作。
关于
Flux.range()
:这个方法的目的是从start开始发出一个计数递增的整数序列。2在你的第一个例子中,它是一个顺序操作,它在你订阅通量的线程上发出数字。3除非你把parallel()
和runOn()
一起使用,否则没有任何并行工作r55awzrz2#
React堆文件中指出了您所看到的行为,其中指出:
要获得ParallelFlux,可以在任何Flux上使用parallel()操作符。该方法本身不会并行化工作,而是将工作负载划分为“rails”(默认情况下,rails的数量与CPU内核的数量相同)。
为了告诉生成的ParallelFlux在哪里运行每个rail(并且,通过扩展,并行运行rail),您必须使用runOn(Scheduler)。注意,对于并行工作,建议使用专用的Scheduler:计划程序.parallel()。
所以你需要
rails
才能分割工作,而要把分割的工作分配到不同的核心上,你需要一个parallel scheduler
。这意味着您的第一个示例由于调度程序而在多个核上有线程,但是
Flux
本身没有能力将工作分割并分配给不同的核。基本上,第一个示例是在多个未使用的内核上分配线程。
请记住,I/O工作如日志记录、发出请求、阅读/写入文件,通常不会从并行工作中受益。您基本上是将工作放在多个核心上,这些核心仍然必须等待其他事情,如文件句柄被释放、其他服务响应等。
因此,你基本上是分配了大量的资源,很少的收益。
I/O工作更像是一种编排,而不是原始的计算能力,如果您需要执行繁重的cpu限制计算,则并行工作通常会大放异彩。
所以不是伐木。