文章0 | 阅读 9583 | 点赞0
由于业务需求有的时候需要将多个数据源进行合并,Reactor提供了concat方法和merge方法:
concat方法示意图:
merge方法示意图:
从图中可以很清楚的看出这两种合并方法的不同:
下面对concat和merge相关的方法进行测试,先准备测试数据:
private Flux<Integer> flux1() {
return Flux.range(1,4);
}
private Flux<Integer> flux2() {
return Flux.range(5,8);
}
private Flux<String> hotFlux1() {
return flux1().map(i-> "[1]"+i).delayElements(Duration.ofMillis(10));
}
private Flux<String> hotFlux2() {
return flux2().map(i-> "[2]"+i).delayElements(Duration.ofMillis(4));
}
@Test
public void concatTest() throws InterruptedException {
Flux.concat(hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
用法和concat基本相同,写法略有不同:
@Test
public void concatWithTest () {
flux1().concatWith(flux2())
.log()
.subscribe();
}
@Test
public void mergeTest() throws InterruptedException {
Flux.merge(hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
@Test
public void mergeWithTest() throws InterruptedException {
hotFlux1().mergeWith(hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
@Test
public void mergeSequentialTest() throws InterruptedException {
Flux.mergeSequential(hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
结果和concat的一样都是
@Test
public void mergeOrderedTest() throws InterruptedException {
Flux.mergeOrdered(Comparator.reverseOrder(), hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
@Test
public void combineLatestTest() throws InterruptedException {
Flux.combineLatest(hotFlux1(), hotFlux2(), (v1, v2) -> v1 + ":" + v2)
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://fanfanzhisu.blog.csdn.net/article/details/107869733
内容来源于网络,如有侵权,请联系作者删除!