本文整理了Java中reactor.core.publisher.Mono.when()
方法的一些代码示例,展示了Mono.when()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.when()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:when
[英]Aggregate given publishers into a new Mono that will be fulfilled when all of the given Publishers have completed. An error will cause pending results to be cancelled and immediate error emission to the returned Mono.
[中]将给定的发布者聚合到一个新的Mono中,当所有给定的发布者都完成时,该Mono将被实现。错误将导致取消挂起的结果,并立即向返回的Mono发送错误。
代码示例来源:origin: reactor/reactor-core
static Mono<Void> when(Publisher<?>[] sources) {
return Mono.when(sources);
}
}
代码示例来源:origin: reactor/reactor-core
/**
* Join the termination signals from this mono and another source into the returned
* void mono
*
* <p>
* <img class="marble" src="doc-files/marbles/and.svg" alt="">
* <p>
* @param other the {@link Publisher} to wait for
* complete
* @return a new combined Mono
* @see #when
*/
public final Mono<Void> and(Publisher<?> other) {
if (this instanceof MonoWhen) {
@SuppressWarnings("unchecked") MonoWhen o = (MonoWhen) this;
Mono<Void> result = o.whenAdditionalSource(other);
if (result != null) {
return result;
}
}
return when(this, other);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void whenIterablePublishersVoidDoesntCombineErrors() {
Exception boom1 = new NullPointerException("boom1");
Exception boom2 = new IllegalArgumentException("boom2");
Iterable<Publisher<Void>> voidPublishers = Arrays.asList(
Mono.<Void>empty(),
Mono.<Void>error(boom1),
Mono.<Void>error(boom2));
StepVerifier.create(Mono.when(voidPublishers))
.verifyErrorMatches(e -> e == boom1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allEmptyPublisherIterable() {
Mono<Void> test = Mono.when(Arrays.asList(Mono.empty(), Flux.empty()));
StepVerifier.create(test)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allEmptyPublisher() {
Mono<Void> test = Mono.when(Mono.empty(), Flux.empty());
StepVerifier.create(test)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noEmptyPublisher() {
Mono<Void> test = Mono.when(Mono.just(1), Flux.just(3));
StepVerifier.create(test)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noSourcePublisher() {
Mono<Void> test = Mono.when();
StepVerifier.create(test)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void someEmptyPublisher() {
Mono<Void> test = Mono.when(Mono.just(1), Flux.empty());
StepVerifier.create(test)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void oneSourcePublisher() {
Mono<Void> test = Mono.when(Flux.empty());
StepVerifier.create(test)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void splitMetricsOnNameFuseable() {
final Mono<Integer> unnamedSource = Mono.just(0).map(v -> 100 / v);
final Mono<Integer> namedSource = Mono.just(0).map(v -> 100 / v)
.name("foo");
final Mono<Integer> unnamed = new MonoMetricsFuseable<>(unnamedSource, registry)
.onErrorResume(e -> Mono.empty());
final Mono<Integer> named = new MonoMetricsFuseable<>(namedSource, registry)
.onErrorResume(e -> Mono.empty());
Mono.when(unnamed, named).block();
Timer unnamedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_EXCEPTION, ArithmeticException.class.getName())
.tag(TAG_SEQUENCE_NAME, REACTOR_DEFAULT_NAME)
.timer();
Timer namedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_EXCEPTION, ArithmeticException.class.getName())
.tag(TAG_SEQUENCE_NAME, "foo")
.timer();
assertThat(unnamedMeter).isNotNull();
assertThat(unnamedMeter.count()).isOne();
assertThat(namedMeter).isNotNull();
assertThat(namedMeter.count()).isOne();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void splitMetricsOnName() {
final Mono<Integer> unnamedSource = Mono.<Integer>error(new ArithmeticException("boom"))
.hide();
final Mono<Integer> unnamed = new MonoMetrics<>(unnamedSource, registry)
.onErrorResume(e -> Mono.empty());
final Mono<Integer> namedSource = Mono.just(40)
.name("foo")
.map(i -> 100 / (40 - i))
.hide();
final Mono<Integer> named = new MonoMetrics<>(namedSource, registry)
.onErrorResume(e -> Mono.empty());
Mono.when(unnamed, named).block();
Timer unnamedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_EXCEPTION, ArithmeticException.class.getName())
.tag(TAG_SEQUENCE_NAME, REACTOR_DEFAULT_NAME)
.timer();
Timer namedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_EXCEPTION, ArithmeticException.class.getName())
.tag(TAG_SEQUENCE_NAME, "foo")
.timer();
assertThat(unnamedMeter).isNotNull();
assertThat(namedMeter).isNotNull();
assertThat(unnamedMeter.count()).isOne();
assertThat(namedMeter.count()).isOne();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void splitMetricsOnNameFuseable() {
final Flux<Integer> unnamedSource = Flux.just(0).map(v -> 100 / v);
final Flux<Integer> namedSource = Flux.range(1, 40)
.map(i -> 100 / (40 - i))
.name("foo");
final Flux<Integer> unnamed = new FluxMetricsFuseable<>(unnamedSource, registry)
.onErrorResume(e -> Mono.empty());
final Flux<Integer> named = new FluxMetricsFuseable<>(namedSource, registry)
.onErrorResume(e -> Mono.empty());
Mono.when(unnamed, named).block();
Timer unnamedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_SEQUENCE_NAME, REACTOR_DEFAULT_NAME)
.timer();
Timer namedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_SEQUENCE_NAME, "foo")
.timer();
assertThat(unnamedMeter).isNotNull();
assertThat(unnamedMeter.count()).isOne();
assertThat(namedMeter).isNotNull();
assertThat(namedMeter.count()).isOne();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void splitMetricsOnName() {
final Flux<Integer> unnamedSource = Flux.<Integer>error(new ArithmeticException("boom"))
.hide();
final Flux<Integer> unnamed = new FluxMetrics<>(unnamedSource, registry)
.onErrorResume(e -> Mono.empty());
final Flux<Integer> namedSource = Flux.range(1, 40)
.name("foo")
.map(i -> 100 / (40 - i))
.hide();
final Flux<Integer> named = new FluxMetrics<>(namedSource, registry)
.onErrorResume(e -> Mono.empty());
Mono.when(unnamed, named).block();
Timer unnamedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_EXCEPTION, ArithmeticException.class.getName())
.tag(TAG_SEQUENCE_NAME, REACTOR_DEFAULT_NAME)
.timer();
Timer namedMeter = registry
.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.tag(TAG_EXCEPTION, ArithmeticException.class.getName())
.tag(TAG_SEQUENCE_NAME, "foo")
.timer();
assertThat(unnamedMeter).isNotNull();
assertThat(namedMeter).isNotNull();
assertThat(unnamedMeter.count()).isOne();
assertThat(namedMeter.count()).isOne();
}
代码示例来源:origin: scalecube/scalecube-services
private Mono<Void> shutdown() {
return Mono.defer(
() ->
gatewayInstances != null && !gatewayInstances.isEmpty()
? Mono.when(
gatewayInstances.values().stream().map(Gateway::stop).toArray(Mono[]::new))
: Mono.empty());
}
代码示例来源:origin: org.springframework.security/spring-security-webflux
@Override
public Mono<Void> writeHttpHeaders(ServerWebExchange exchange) {
Stream<Mono<Void>> results = writers.stream().map( writer -> writer.writeHttpHeaders(exchange));
return Mono.when(results.collect(Collectors.toList()));
}
代码示例来源:origin: scalecube/scalecube-services
private Mono<Void> shutdown() {
return Mono.defer(
() ->
Mono.when(
Optional.ofNullable(serverTransport)
.map(ServerTransport::stop)
.orElse(Mono.empty()),
transportResources.shutdown()));
}
}
代码示例来源:origin: io.projectreactor.netty/reactor-netty
/**
* Dispose underlying resources in a listenable fashion.
* @return the Mono that represents the end of disposal
*/
protected Mono<Void> _disposeLater() {
return Mono.when(
defaultLoops.disposeLater(), defaultProvider.disposeLater());
}
代码示例来源:origin: reactor/reactor-netty
/**
* Dispose underlying resources in a listenable fashion.
* @return the Mono that represents the end of disposal
*/
protected Mono<Void> _disposeLater() {
return Mono.when(
defaultLoops.disposeLater(), defaultProvider.disposeLater());
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
/**
* Dispose underlying resources in a listenable fashion.
* @return the Mono that represents the end of disposal
*/
protected Mono<Void> _disposeLater() {
return Mono.when(
defaultLoops.disposeLater(),
defaultPools.disposeLater());
}
代码示例来源:origin: org.mule.runtime/mule-core
private Mono<Void> policySuccessError(SourceErrorException see, SourcePolicySuccessResult successResult, PhaseContext ctx,
FlowConstruct flowConstruct, MessageSource messageSource) {
MessagingException messagingException =
see.toMessagingException(flowConstruct.getMuleContext().getExceptionContextProviders(), messageSource);
return when(just(messagingException).flatMapMany(flowConstruct.getExceptionListener()).last()
.onErrorResume(e -> empty()),
sendErrorResponse(messagingException, successResult.createErrorResponseParameters(), ctx, flowConstruct)
.doOnSuccess(v -> onTerminate(flowConstruct, messageSource, ctx.terminateConsumer, left(messagingException))))
.then();
}
内容来源于网络,如有侵权,请联系作者删除!