文章0 | 阅读 9580 | 点赞0
Stream中的并行处理非常简单,只要加上parallel(),就可以将stream并行化:
@Test
public void streamParallel () {
Stream.of(1,2,3,4,5,6,7,8).parallel().map(String::valueOf).forEach(log::info);
}
}
根据结果中线程使用情况可知这种操作就完成了stream的并行化:
Reactor的并行化跟stream一样简单,不同于stream对并行的不可控,Reator还提供了可以对并行运行的调度器Schedulers
在Reactor中,并行执行以及执行的位置由所Scheduler
确定 。
Schedulers 类有如下几种对上下文操作的静态方法:
Reactor 提供了两种通过Scheduler切换上下文执行的方法:publishOn和
subscribeOn。
@Test
public void publishOnTest() {
Flux.range(1,2)
.map(i -> {
log.info("Map 1, the value map to: {}", i*i);
return i*i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("Map 2, the value map to: {}", -i);
return -i;
})
.publishOn(Schedulers.newParallel("parallel", 4))
.map(i -> {
log.info("Map 3, the value map to: {}", i+2);
return (i+2) + "";
})
.subscribe();
}
有图可见subscribeOn使用之后会全方位覆盖,因此如果出现多个subscribeOn(),回执行后触发的
@Test
public void subscribeOnTest() throws InterruptedException {
Flux.range(1,2)
.map(i -> {
log.info("Map 1, the value map to: {}", i*i);
return i*i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
log.info("Map 2, the value map to: {}", -i);
return -i;
})
.subscribeOn(Schedulers.newParallel("parallel", 4))
.map(i -> {
log.info("Map 3, the value map to {}", i+2);
return (i+2) + "";
})
.subscribe();
Thread.sleep(100);
}
由结果可见,subscribe是反向处理,因此先触发parallel,后触发single,因此都是使用的single
看个例子代码:
@Test
public void subscribeOnTest() throws InterruptedException {
Flux.range(1,2)
.map(i -> {
log.info("Map 1, the value map to: {}", i*i);
return i*i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("Map 2, the value map to: {}", -i);
return -i;
})
.subscribeOn(Schedulers.newParallel("parallel", 4))
.map(i -> {
log.info("Map 3, the value map to {}", i+2);
return (i+2) + "";
})
.subscribe();
Thread.sleep(100);
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://fanfanzhisu.blog.csdn.net/article/details/107861620
内容来源于网络,如有侵权,请联系作者删除!