本文整理了Java中reactor.core.publisher.Mono.onErrorResume()
方法的一些代码示例,展示了Mono.onErrorResume()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onErrorResume()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onErrorResume
[英]Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.
[中]当出现与给定类型匹配的错误时,订阅回退发布服务器,使用函数根据错误选择回退。
代码示例来源:origin: spring-projects/spring-framework
@Override
public RouterFunctions.Builder onError(Predicate<? super Throwable> predicate,
BiFunction<? super Throwable, ServerRequest, Mono<ServerResponse>> responseProvider) {
Assert.notNull(predicate, "Predicate must not be null");
Assert.notNull(responseProvider, "ResponseProvider must not be null");
return filter((request, next) -> next.handle(request)
.onErrorResume(predicate, t -> responseProvider.apply(t, request)));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T extends Throwable> RouterFunctions.Builder onError(Class<T> exceptionType,
BiFunction<? super T, ServerRequest, Mono<ServerResponse>> responseProvider) {
Assert.notNull(exceptionType, "ExceptionType must not be null");
Assert.notNull(responseProvider, "ResponseProvider must not be null");
return filter((request, next) -> next.handle(request)
.onErrorResume(exceptionType, t -> responseProvider.apply(t, request)));
}
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
// but ignore exception, in case the handler did consume.
return (Mono<T>) response.bodyToMono(Void.class)
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
代码示例来源:origin: spring-projects/spring-framework
@Nullable
private String formatBody(@Nullable MediaType contentType, Mono<byte[]> body) {
return body
.map(bytes -> {
if (contentType == null) {
return bytes.length + " bytes of content (unknown content-type).";
}
Charset charset = contentType.getCharset();
if (charset != null) {
return new String(bytes, charset);
}
if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) {
return new String(bytes, StandardCharsets.UTF_8);
}
return bytes.length + " bytes of content.";
})
.defaultIfEmpty("No content")
.onErrorResume(ex -> Mono.just("Failed to obtain content: " + ex.getMessage()))
.block(this.timeout);
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<MatchResult> matches(ServerWebExchange exchange) {
return this.bearerTokenConverter.convert(exchange)
.flatMap(this::nullAuthentication)
.onErrorResume(e -> notMatch());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Mono<Void> completion;
try {
completion = super.handle(exchange);
}
catch (Throwable ex) {
completion = Mono.error(ex);
}
for (WebExceptionHandler handler : this.exceptionHandlers) {
completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
}
return completion;
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Mono<Void> notify(InstanceEvent event) {
return Flux.fromIterable(delegates).flatMap(d -> d.notify(event).onErrorResume(error -> {
log.warn("Unexpected exception while triggering notifications. Notification might not be sent.", error);
return Mono.empty();
})).then();
}
}
代码示例来源:origin: spring-projects/spring-framework
private <T extends Publisher<?>> T handleBody(ClientResponse response,
T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) {
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
for (StatusHandler handler : this.statusHandlers) {
if (handler.test(response.statusCode())) {
HttpRequest request = this.requestSupplier.get();
Mono<? extends Throwable> exMono = handler.apply(response, request);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
return errorFunction.apply(exMono);
}
}
return bodyPublisher;
}
else {
return errorFunction.apply(createResponseException(response, this.requestSupplier.get()));
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<Void> shutdown() {
if (this.stopping) {
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.set(null);
return future;
}
this.stopping = true;
Mono<Void> result;
if (this.channelGroup != null) {
result = FutureMono.from(this.channelGroup.close());
if (this.loopResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
}
if (this.poolResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
}
result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
}
else {
result = stopScheduler();
}
return new MonoToListenableFutureAdapter<>(result);
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Instance> doUpdateStatus(Instance instance) {
if (!instance.isRegistered()) {
return Mono.empty();
}
log.debug("Update status for {}", instance);
return instanceWebClient.instance(instance)
.get()
.uri(Endpoint.HEALTH)
.exchange()
.log(log.getName(), Level.FINEST)
.flatMap(this::convertStatusInfo)
.doOnError(ex -> logError(instance, ex))
.onErrorResume(this::handleError)
.map(instance::withStatusInfo);
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Mono<Void> completion;
try {
completion = super.handle(exchange);
}
catch (Throwable ex) {
completion = Mono.error(ex);
}
for (WebExceptionHandler handler : this.exceptionHandlers) {
completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
}
return completion;
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return delegate.notify(event).onErrorResume(error -> Mono.empty()).then(Mono.fromRunnable(() -> {
if (shouldEndReminder(event)) {
reminders.remove(event.getInstance());
} else if (shouldStartReminder(event)) {
reminders.putIfAbsent(event.getInstance(), new Reminder(event));
}
}));
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Void> recomputeSnapshot(InstanceId instanceId) {
return this.getEventStore()
.find(instanceId)
.collectList()
.map(events -> Instance.create(instanceId).apply(events))
.doOnNext(instance -> snapshots.put(instance.getId(), instance))
.then()
.onErrorResume(ex2 -> {
log.error(
"Error while recomputing snapshot. Event history for instance {} may be wrong,",
instanceId,
ex2
);
return Mono.empty();
});
}
}
代码示例来源:origin: spring-projects/spring-security
private Mono<Void> authenticate(ServerWebExchange exchange,
WebFilterChain chain, Authentication token) {
WebFilterExchange webFilterExchange = new WebFilterExchange(exchange, chain);
return this.authenticationManager.authenticate(token)
.switchIfEmpty(Mono.defer(() -> Mono.error(new IllegalStateException("No provider found for " + token.getClass()))))
.flatMap(authentication -> onAuthenticationSuccess(authentication, webFilterExchange))
.onErrorResume(AuthenticationException.class, e -> this.authenticationFailureHandler
.onAuthenticationFailure(webFilterExchange, e));
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Void> updateSnapshot(InstanceEvent event) {
return Mono.<Void>fromRunnable(() -> snapshots.compute(event.getInstance(), (key, old) -> {
Instance instance = old != null ? old : Instance.create(key);
return instance.apply(event);
})).onErrorResume(ex -> {
log.warn(
"Error while updating the snapshot with event {}. Recomputing instance snapshot from event history.",
event,
ex
);
return recomputeSnapshot(event.getInstance());
});
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return this.authorizationRequestResolver.resolve(exchange)
.switchIfEmpty(chain.filter(exchange).then(Mono.empty()))
.onErrorResume(ClientAuthorizationRequiredException.class, e -> {
return this.requestCache.saveRequest(exchange)
.then(this.authorizationRequestResolver.resolve(exchange, e.getClientRegistrationId()));
})
.flatMap(clientRegistration -> sendRedirectForAuthorization(exchange, clientRegistration));
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void writeWithError() throws Exception {
TestServerHttpResponse response = new TestServerHttpResponse();
response.getHeaders().setContentLength(12);
IllegalStateException error = new IllegalStateException("boo");
response.writeWith(Flux.error(error)).onErrorResume(ex -> Mono.empty()).block();
assertFalse(response.statusCodeWritten);
assertFalse(response.headersWritten);
assertFalse(response.cookiesWritten);
assertFalse(response.getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH));
assertTrue(response.body.isEmpty());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized");
InitBinderBindingContext bindingContext = new InitBinderBindingContext(
getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
Function<Throwable, Mono<HandlerResult>> exceptionHandler =
ex -> handleException(ex, handlerMethod, bindingContext, exchange);
return this.modelInitializer
.initModel(handlerMethod, bindingContext, exchange)
.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.doOnNext(result -> bindingContext.saveModel())
.onErrorResume(exceptionHandler);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
内容来源于网络,如有侵权,请联系作者删除!