文章0 | 阅读 9577 | 点赞0
无论是使用webflux还是reactor都有访问文件获取数据的需求,这里简单介绍reactor读写文件的方法:
项目目录下准备一个文件命名info.txt,随便写点什么东西:
@Test
public void monoRead() {
Mono.fromCallable(()-> Files.readAllLines(Paths.get("info.txt")))
.log()
.subscribe();
}
运行结果如下
两种方法:
@Test
public void FluxRead() throws IOException{
log.info("--------------------from iterable--------------------------");
Flux.fromIterable(Files.readAllLines(Paths.get("info.txt")))
.log()
.subscribe();
log.info("--------------------from stream--------------------------");
Flux.fromStream(Files.lines(Paths.get("info.txt")))
.log()
.subscribe();
}
两种方法运行结果都一样,不过第一种方法是全部读取完成之后再处理,第二种方法是流式处理,文件大的话第一种方法会内存消耗过大,推荐使用第二种方法:
文件有读就有写,写的处理要在subscribe时进行
@Test
public void baseWrite() throws IOException {
Flux<String> flux = Flux.fromStream(Files.lines(Paths.get("info.txt")))
.map(String::toUpperCase)
.log();
flux.subscribe(new BaseSubscriber<String>() {
BufferedWriter bw = Files.newBufferedWriter(Paths.get("newInfo.txt"));
@SneakyThrows
@Override
protected void hookOnNext(String value) {
bw.write(value + "\n");
}
@SneakyThrows
@Override
protected void hookOnComplete() {
bw.flush();
bw.write("**** do flush **** \n");
bw.close();
}
});
}
@Test
public void flushWrite() throws IOException {
Flux<String> flux = Flux.fromStream(Files.lines(Paths.get("info.txt")))
.map(String::toUpperCase)
.log();
flux.subscribe(new BaseSubscriber<String>() {
BufferedWriter bw = Files.newBufferedWriter(Paths.get("newInfo.txt"));
private int count = 0;
@SneakyThrows
@Override
protected void hookOnNext(String value) {
count++;
bw.write(value + "\n");
if (count % 2 == 0) { // 设定行数进行清理缓存写入文件
bw.write("**** do flush **** \n");
bw.flush();
}
}
@SneakyThrows
@Override
protected void hookOnComplete() {
bw.close();
}
});
}
上面设置每两行进行一次flush:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://fanfanzhisu.blog.csdn.net/article/details/107855328
内容来源于网络,如有侵权,请联系作者删除!