本文整理了Java中reactor.core.publisher.Mono.doOnSuccessOrError()
方法的一些代码示例,展示了Mono.doOnSuccessOrError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnSuccessOrError()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnSuccessOrError
[英]Add behavior triggered when the Mono terminates, either by completing successfully or with an error.
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Sample sample = Timer.start(meterRegistry);
return chain.filter(exchange).doOnSuccessOrError((aVoid, ex) -> {
endTimerRespectingCommit(exchange, sample);
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoRejectedDoOnSuccessOrError() {
Mono<String> mp = Mono.error(new Exception("test"));
AtomicReference<Throwable> ref = new AtomicReference<>();
mp.doOnSuccessOrError((s, f) -> ref.set(f))
.subscribe();
assertThat(ref.get()).hasMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoSuccessDoOnSuccessOrError() {
Mono<String> mp = Mono.just("test");
AtomicReference<String> ref = new AtomicReference<>();
mp.doOnSuccessOrError((s, f) -> ref.set(s))
.subscribe();
assertThat(ref.get()).isEqualToIgnoringCase("test");
}
代码示例来源:origin: line/armeria
Mono<Void> handle(ServiceRequestContext ctx, HttpRequest req, CompletableFuture<HttpResponse> future,
@Nullable String serverHeader) {
final ArmeriaServerHttpRequest convertedRequest;
try {
convertedRequest = new ArmeriaServerHttpRequest(ctx, req, factoryWrapper);
} catch (Exception e) {
logger.warn("{} Invalid request path: {}", ctx, req.path(), e);
future.complete(HttpResponse.of(HttpStatus.BAD_REQUEST));
return Mono.empty();
}
final ArmeriaServerHttpResponse convertedResponse =
new ArmeriaServerHttpResponse(ctx, future, factoryWrapper, serverHeader);
return httpHandler.handle(convertedRequest, convertedResponse)
.doOnSuccessOrError((unused, cause) -> {
if (cause != null) {
logger.debug("{} Failed to handle a request", ctx, cause);
convertedResponse.setComplete(cause).subscribe();
} else {
convertedResponse.setComplete().subscribe();
}
});
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeOnNextWithoutCancel() {
AtomicInteger onCancel = new AtomicInteger();
AtomicInteger sourceOnCancel = new AtomicInteger();
AtomicInteger onTerminate = new AtomicInteger();
AtomicInteger sourceOnTerminate = new AtomicInteger();
Mono<String> source = Mono.<String>fromDirect(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext("foo");
})
.doOnCancel(sourceOnCancel::incrementAndGet)
.doOnSuccessOrError((v, e) -> sourceOnTerminate.incrementAndGet());
StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source,
2,
TimeUnit.SECONDS,
defaultSchedulerForDelay())
.doOnCancel(onCancel::incrementAndGet)
.doOnSuccessOrError((v, e) -> onTerminate.incrementAndGet()))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(2))
.expectNext("foo")
.verifyComplete();
assertThat(onTerminate.get()).isEqualTo(1);
assertThat(sourceOnTerminate.get()).isEqualTo(1);
assertThat(onCancel.get()).isEqualTo(0);
assertThat(sourceOnCancel.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorForEmpty() {
LongAdder invoked = new LongAdder();
AtomicReference<String> value = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
StepVerifier.create(Mono.<String>empty()
.doOnSuccessOrError((v, t) -> {
invoked.increment();
value.set(v);
error.set(t);
}))
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
assertEquals(null, value.get());
assertEquals(null, error.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorCallbackFailureInterruptsOnNext() {
LongAdder invoked = new LongAdder();
StepVerifier.create(Mono.just("foo")
.doOnSuccessOrError((v, t) -> {
invoked.increment();
throw new IllegalArgumentException(v);
}))
.expectErrorMessage("foo")
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorForOnError() {
LongAdder invoked = new LongAdder();
AtomicReference<String> value = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
IllegalArgumentException err = new IllegalArgumentException("boom");
StepVerifier.create(Mono.<String>error(err)
.doOnSuccessOrError((v, t) -> {
invoked.increment();
value.set(v);
error.set(t);
}))
.expectErrorMessage("boom")
.verify();
assertEquals(1, invoked.intValue());
assertEquals(null, value.get());
assertEquals(err, error.get());
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Test
public void echo() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(getUrl("/echo"),
session -> {
logger.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doOnSuccessOrError((aVoid, ex) ->
logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
assertEquals(input.collectList().block(Duration.ofMillis(5000)),
output.collectList().block(Duration.ofMillis(5000)));
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Test
public void echoForHttp() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(getHttpUrl("/echoForHttp"),
session -> {
logger.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doOnSuccessOrError((aVoid, ex) ->
logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
assertEquals(input.collectList().block(Duration.ofMillis(5000)),
output.collectList().block(Duration.ofMillis(5000)));
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
return (exchange, chain) -> {
trace("Entering retry-filter");
// chain.filter returns a Mono<Void>
Publisher<Void> publisher = chain.filter(exchange)
//.log("retry-filter", Level.INFO)
.doOnSuccessOrError((aVoid, throwable) -> {
int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
int newIteration = iteration + 1;
trace("setting new iteration in attr %d", newIteration);
exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
});
if (retry != null) {
// retryWhen returns a Mono<Void>
// retry needs to go before repeat
publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
}
if (repeat != null) {
// repeatWhen returns a Flux<Void>
// so this needs to be last and the variable a Publisher<Void>
publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
}
return Mono.fromDirect(publisher);
};
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorFusion() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});
StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();
assertFalse("unexpected empty completion", completedEmpty.get());
assertEquals(1, invoked.intValue());
assertEquals("unexpected error", null, error.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorNormal() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.hide()
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});
StepVerifier.create(mono)
.expectFusion(Fuseable.ANY, Fuseable.NONE)
.expectNext(55)
.expectComplete()
.verify();
assertFalse("unexpected empty completion", completedEmpty.get());
assertEquals(1, invoked.intValue());
assertEquals("unexpected error", null, error.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorFusionConditional() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.filter(v -> true)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});
StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();
assertFalse("unexpected empty completion", completedEmpty.get());
assertEquals(1, invoked.intValue());
assertEquals("unexpected error", null, error.get());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Mono<Integer> requestSizeMono = request.getBody().
reduce(0, (integer, dataBuffer) -> integer +
dataBuffer.readableByteCount()).
doOnSuccessOrError((size, throwable) -> {
assertNull(throwable);
assertEquals(REQUEST_SIZE, (long) size);
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return requestSizeMono.then(response.writeWith(multipleChunks()));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessOrErrorNormalConditional() {
LongAdder invoked = new LongAdder();
AtomicBoolean completedEmpty = new AtomicBoolean();
AtomicReference<Throwable> error = new AtomicReference<>();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.hide()
.filter(v -> true)
.doOnSuccessOrError((v, t) -> {
if (v == null && t == null) completedEmpty.set(true);
if (t != null) error.set(t);
invoked.increment();
});
StepVerifier.create(mono)
.expectFusion(Fuseable.ANY, Fuseable.NONE)
.expectNext(55)
.expectComplete()
.verify();
assertFalse("unexpected empty completion", completedEmpty.get());
assertEquals(1, invoked.intValue());
assertEquals("unexpected error", null, error.get());
}
代码示例来源:origin: mulesoft/mule
}, sink::error);
})
.doOnSuccessOrError((value, e) -> {
try {
after(context, value, executedInterceptors);
代码示例来源:origin: scalecube/scalecube-services
/**
* Method to send normal response.
*
* @param response response
* @return mono void
*/
public Mono<Void> send(GatewayMessage response) {
return Mono.defer(
() ->
outbound
.sendObject(Mono.just(response).map(codec::encode).map(TextWebSocketFrame::new))
.then()
.doOnSuccessOrError((avoid, th) -> logSend(response, th)));
}
代码示例来源:origin: org.mule.runtime/mule-core
@Override
public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
return from(publisher).concatMap(event -> Mono.just(event).doOnNext(pushSubFlowFlowStackElement())
.transform(s -> super.apply(s)).doOnSuccessOrError((result, throwable) -> popSubFlowFlowStackElement().accept(event)));
}
}
代码示例来源:origin: com.vmware.card-connectors/core-test
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
int phase = phaser.getPhase();
phaser.register();
return reactorClientHttpConnector.connect(method, uri, requestCallback)
.doOnSuccessOrError((response, throwable) -> phaser.arriveAndDeregister())
.delayUntil(response -> awaitAdvance(phase));
}
内容来源于网络,如有侵权,请联系作者删除!