文章0 | 阅读 9585 | 点赞0
上一篇介绍了Mono,mono表示0~1
的序列,flux用来表示0~N
个元素序列,mono是flux的简化版,flux可以用来表示流
因为是表示连续序列Flux和Mono的创建方法,有些不同,下面是flux的一些创建方法:
如下为创建flux示例:
@Test
public void flux () throws InterruptedException {
Flux<Integer> intFlux = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> rangeFlux = Flux.range(6, 4); // 以6开始,取4个值:6,7,8,9
Flux.fromArray(new Integer[]{1,3,4,5,6,12}).subscribe(System.out::println); // 通过fromArray构建
Flux<String> strFluxFromStream = Flux.fromStream(Stream.of("just", "test", "reactor", "Flux", "and", "Mono"));
Flux<String> strFluxFromList = Flux.fromIterable(Arrays.asList("just", "test", "reactor", "Flux", "and", "Mono"));
// 通过merge合并
Flux<String> strMerge = Flux.merge(strFluxFromStream, strFluxFromList);
Flux<Integer> intFluxMerged = Flux.merge(intFlux, rangeFlux);
strMerge.subscribe(log::info);
intFluxMerged.subscribe(i -> log.info("{}", i));
// 通过interval创建流数据
Flux.interval(Duration.ofMillis(100)).map(String::valueOf)
.subscribe(log::info);
Thread.sleep(2000);
}
subscribe方法最多可以传入四个参数:
@Test
public void subscribe () throws InterruptedException {
Flux
.interval(Duration.ofMillis(100))
.map(i -> {
if (i == 3) throw new RuntimeException("fake a mistake");
else return String.valueOf(i);
})
.subscribe(info -> log.info("info: {}", info), // 参数1, 接受内容
err -> log.error("error: {}", err.getMessage()), // 参数2, 对err处理的lambda函数
() -> log.info("Done"), // 参数3, 完成subscribe之后执行的lambda函数u
sub -> sub.request(10)); // 参数4, Subscription操作, 设定从源头获取元素的个数
Thread.sleep(2000);
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://fanfanzhisu.blog.csdn.net/article/details/107825919
内容来源于网络,如有侵权,请联系作者删除!