本文整理了Java中reactor.core.publisher.Mono.thenEmpty()
方法的一些代码示例,展示了Mono.thenEmpty()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.thenEmpty()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:thenEmpty
[英]Return a Mono that waits for this Mono to complete then for a supplied Publisher to also complete. The second completion signal is replayed, or any error signal that occurs instead.
[中]返回一个Mono,等待此Mono完成,然后等待提供的发布服务器也完成。重放第二个完成信号,或者重放发生的任何错误信号。
代码示例来源:origin: spring-projects/spring-framework
Mono<Void> responseMonoVoid(ServerHttpResponse response) {
return Mono.delay(Duration.ofMillis(100))
.thenEmpty(Mono.defer(() -> response.writeWith(getBody("body"))));
}
代码示例来源:origin: reactor/reactor-core
Publisher<Void> scenario(){
return Mono.just(1)
.thenEmpty(Mono.delay(Duration.ofSeconds(123)).then());
}
代码示例来源:origin: spring-projects/spring-framework
Mono<Void> exchangeMonoVoid(ServerWebExchange exchange) {
return Mono.delay(Duration.ofMillis(100))
.thenEmpty(Mono.defer(() -> exchange.getResponse().writeWith(getBody("body"))));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
StepVerifier.create(Mono.just(1)
.thenEmpty(Flux.empty()))
.verifyComplete();
}
代码示例来源:origin: poutsma/web-function-sample
@Override
public Mono<Void> savePerson(Mono<Person> personMono) {
return personMono.doOnNext(person -> {
int id = people.size() + 1;
people.put(id, person);
System.out.format("Saved %s with id %d%n", person, id);
}).thenEmpty(Mono.empty());
}
}
代码示例来源:origin: reactor/reactor-netty
OutboundThen(NettyOutbound source, Publisher<Void> thenPublisher) {
this.source = source;
Mono<Void> parentMono = source.then();
if (parentMono == Mono.<Void>empty()) {
this.thenMono = Mono.from(thenPublisher);
}
else {
this.thenMono = parentMono.thenEmpty(thenPublisher);
}
}
代码示例来源:origin: io.projectreactor.ipc/reactor-ipc
OutboundThen(Outbound<OUT> source, Publisher<Void> thenPublisher) {
Mono<Void> parentMono = source.then();
this.source = source;
if (parentMono == Mono.<Void>empty()) {
this.thenMono = Mono.from(thenPublisher);
}
else {
this.thenMono = parentMono.thenEmpty(thenPublisher);
}
}
代码示例来源:origin: io.projectreactor.netty/reactor-netty
OutboundThen(NettyOutbound source, Publisher<Void> thenPublisher) {
this.source = source;
Mono<Void> parentMono = source.then();
if (parentMono == Mono.<Void>empty()) {
this.thenMono = Mono.from(thenPublisher);
}
else {
this.thenMono = parentMono.thenEmpty(thenPublisher);
}
}
代码示例来源:origin: reactor/reactor-kafka
KafkaOutboundThen(DefaultKafkaSender<K, V> sender, KafkaOutbound<K, V> kafkaOutbound, Publisher<Void> thenPublisher) {
super(sender);
Mono<Void> parentMono = kafkaOutbound.then();
if (parentMono == Mono.<Void>empty())
this.thenMono = Mono.from(thenPublisher);
else
this.thenMono = parentMono.thenEmpty(thenPublisher);
}
代码示例来源:origin: io.projectreactor.kafka/reactor-kafka
KafkaOutboundThen(DefaultKafkaSender<K, V> sender, KafkaOutbound<K, V> kafkaOutbound, Publisher<Void> thenPublisher) {
super(sender);
Mono<Void> parentMono = kafkaOutbound.then();
if (parentMono == Mono.<Void>empty())
this.thenMono = Mono.from(thenPublisher);
else
this.thenMono = parentMono.thenEmpty(thenPublisher);
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
OutboundThen(NettyOutbound source, Publisher<Void> thenPublisher) {
this.sourceContext = source.context();
Mono<Void> parentMono = source.then();
if (parentMono == Mono.<Void>empty()) {
this.thenMono = Mono.from(thenPublisher);
}
else {
this.thenMono = parentMono.thenEmpty(thenPublisher);
}
}
代码示例来源:origin: reactor/reactor-netty
@Override
public Mono<Void> then() {
ByteBufAllocator alloc = parent.channel().alloc();
return Flux.from(source)
.collect(alloc::heapBuffer, ByteBuf::writeBytes)
.flatMap(agg -> {
if (!HttpUtil.isTransferEncodingChunked(request) && !HttpUtil.isContentLengthSet(request)) {
request.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes());
}
if (agg.readableBytes() > 0) {
return parent.then().thenEmpty(FutureMono.disposableWriteAndFlush(parent.channel(), Mono.just(agg)));
}
agg.release();
return parent.then();
});
}
代码示例来源:origin: io.projectreactor.netty/reactor-netty
@Override
public Mono<Void> then() {
ByteBufAllocator alloc = parent.channel().alloc();
return Flux.from(source)
.collect(alloc::heapBuffer, ByteBuf::writeBytes)
.flatMap(agg -> {
if (!HttpUtil.isTransferEncodingChunked(request) && !HttpUtil.isContentLengthSet(request)) {
request.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes());
}
return parent.then().thenEmpty(FutureMono.disposableWriteAndFlush(parent.channel(), Mono.just(agg)));
});
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
@Override
public Mono<Void> then() {
ByteBufAllocator alloc = parent.channel().alloc();
return Flux.from(source)
.collect(alloc::heapBuffer, ByteBuf::writeBytes)
.flatMap(agg -> {
if (!HttpUtil.isTransferEncodingChunked(request) && !HttpUtil.isContentLengthSet(request)) {
request.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes());
}
return parent.then().thenEmpty(FutureMono.disposableWriteAndFlush(context().channel(), Mono.just(agg)));
});
}
}
代码示例来源:origin: reactor/reactor-kafka
public Flux<?> flux() {
return KafkaReceiver.create(receiverOptions(Collections.singletonList(topic)).commitInterval(Duration.ZERO))
.receive()
.publishOn(scheduler)
.concatMap(m -> storeInDB(m.value())
.thenEmpty(m.receiverOffset().commit()))
.retry()
.doOnCancel(() -> close());
}
public Mono<Void> storeInDB(Person person) {
代码示例来源:origin: reactor/reactor-kafka
.send(outgoing1.append(topic, 10).senderRecords())
.thenEmpty(Mono.fromRunnable(() -> doneSemaphore1.release()))
.thenEmpty(sender.send(outgoing1.append(topic, 10).senderRecords()).then())
.then(Mono.fromRunnable(() -> waitSemaphore1.acquireUninterruptibly()).publishOn(Schedulers.single()))
.thenEmpty(sender.send(outgoing1.append(topic, 10).senderRecords()).then());
.send(outgoing2.append(topic, 10).senderRecords())
.thenEmpty(Mono.fromRunnable(() -> doneSemaphore2.release()))
.thenEmpty(sender.send(outgoing2.append(topic, 10).senderRecords()).then())
.then(Mono.fromRunnable(() -> waitSemaphore2.acquireUninterruptibly()).publishOn(Schedulers.single()))
.thenEmpty(sender.send(outgoing2.append(topic, 10).senderRecords()).then());
内容来源于网络,如有侵权,请联系作者删除!