reactor.core.publisher.Mono.onErrorResume()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(760)

本文整理了Java中reactor.core.publisher.Mono.onErrorResume()方法的一些代码示例,展示了Mono.onErrorResume()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onErrorResume()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onErrorResume

Mono.onErrorResume介绍

[英]Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.
[中]当出现与给定类型匹配的错误时,订阅回退发布服务器,使用函数根据错误选择回退。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public RouterFunctions.Builder onError(Predicate<? super Throwable> predicate,
    BiFunction<? super Throwable, ServerRequest, Mono<ServerResponse>> responseProvider) {
  Assert.notNull(predicate, "Predicate must not be null");
  Assert.notNull(responseProvider, "ResponseProvider must not be null");
  return filter((request, next) -> next.handle(request)
      .onErrorResume(predicate, t -> responseProvider.apply(t, request)));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T extends Throwable> RouterFunctions.Builder onError(Class<T> exceptionType,
    BiFunction<? super T, ServerRequest, Mono<ServerResponse>> responseProvider) {
  Assert.notNull(exceptionType, "ExceptionType must not be null");
  Assert.notNull(responseProvider, "ResponseProvider must not be null");
  return filter((request, next) -> next.handle(request)
      .onErrorResume(exceptionType, t -> responseProvider.apply(t, request)));
}

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
  // Ensure the body is drained, even if the StatusHandler didn't consume it,
  // but ignore exception, in case the handler did consume.
  return (Mono<T>) response.bodyToMono(Void.class)
      .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}

代码示例来源:origin: spring-projects/spring-framework

@Nullable
private String formatBody(@Nullable MediaType contentType, Mono<byte[]> body) {
  return body
      .map(bytes -> {
        if (contentType == null) {
          return bytes.length + " bytes of content (unknown content-type).";
        }
        Charset charset = contentType.getCharset();
        if (charset != null) {
          return new String(bytes, charset);
        }
        if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) {
          return new String(bytes, StandardCharsets.UTF_8);
        }
        return bytes.length + " bytes of content.";
      })
      .defaultIfEmpty("No content")
      .onErrorResume(ex -> Mono.just("Failed to obtain content: " + ex.getMessage()))
      .block(this.timeout);
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<MatchResult> matches(ServerWebExchange exchange) {
  return this.bearerTokenConverter.convert(exchange)
      .flatMap(this::nullAuthentication)
      .onErrorResume(e -> notMatch());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
  Mono<Void> completion;
  try {
    completion = super.handle(exchange);
  }
  catch (Throwable ex) {
    completion = Mono.error(ex);
  }
  for (WebExceptionHandler handler : this.exceptionHandlers) {
    completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
  }
  return completion;
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
  return getResultHandler(result).handleResult(exchange, result)
      .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
          getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
  public Mono<Void> notify(InstanceEvent event) {
    return Flux.fromIterable(delegates).flatMap(d -> d.notify(event).onErrorResume(error -> {
      log.warn("Unexpected exception while triggering notifications. Notification might not be sent.", error);
      return Mono.empty();
    })).then();
  }
}

代码示例来源:origin: spring-projects/spring-framework

private <T extends Publisher<?>> T handleBody(ClientResponse response,
    T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) {
  if (HttpStatus.resolve(response.rawStatusCode()) != null) {
    for (StatusHandler handler : this.statusHandlers) {
      if (handler.test(response.statusCode())) {
        HttpRequest request = this.requestSupplier.get();
        Mono<? extends Throwable> exMono = handler.apply(response, request);
        exMono = exMono.flatMap(ex -> drainBody(response, ex));
        exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
        return errorFunction.apply(exMono);
      }
    }
    return bodyPublisher;
  }
  else {
    return errorFunction.apply(createResponseException(response, this.requestSupplier.get()));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> shutdown() {
  if (this.stopping) {
    SettableListenableFuture<Void> future = new SettableListenableFuture<>();
    future.set(null);
    return future;
  }
  this.stopping = true;
  Mono<Void> result;
  if (this.channelGroup != null) {
    result = FutureMono.from(this.channelGroup.close());
    if (this.loopResources != null) {
      result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
    }
    if (this.poolResources != null) {
      result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
    }
    result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
  }
  else {
    result = stopScheduler();
  }
  return new MonoToListenableFutureAdapter<>(result);
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Instance> doUpdateStatus(Instance instance) {
  if (!instance.isRegistered()) {
    return Mono.empty();
  }
  log.debug("Update status for {}", instance);
  return instanceWebClient.instance(instance)
              .get()
              .uri(Endpoint.HEALTH)
              .exchange()
              .log(log.getName(), Level.FINEST)
              .flatMap(this::convertStatusInfo)
              .doOnError(ex -> logError(instance, ex))
              .onErrorResume(this::handleError)
              .map(instance::withStatusInfo);
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
  Mono<Void> completion;
  try {
    completion = super.handle(exchange);
  }
  catch (Throwable ex) {
    completion = Mono.error(ex);
  }
  for (WebExceptionHandler handler : this.exceptionHandlers) {
    completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
  }
  return completion;
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return delegate.notify(event).onErrorResume(error -> Mono.empty()).then(Mono.fromRunnable(() -> {
    if (shouldEndReminder(event)) {
      reminders.remove(event.getInstance());
    } else if (shouldStartReminder(event)) {
      reminders.putIfAbsent(event.getInstance(), new Reminder(event));
    }
  }));
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> recomputeSnapshot(InstanceId instanceId) {
    return this.getEventStore()
          .find(instanceId)
          .collectList()
          .map(events -> Instance.create(instanceId).apply(events))
          .doOnNext(instance -> snapshots.put(instance.getId(), instance))
          .then()
          .onErrorResume(ex2 -> {
            log.error(
              "Error while recomputing snapshot. Event history for instance {} may be wrong,",
              instanceId,
              ex2
            );
            return Mono.empty();
          });
  }
}

代码示例来源:origin: spring-projects/spring-security

private Mono<Void> authenticate(ServerWebExchange exchange,
    WebFilterChain chain, Authentication token) {
  WebFilterExchange webFilterExchange = new WebFilterExchange(exchange, chain);
  return this.authenticationManager.authenticate(token)
      .switchIfEmpty(Mono.defer(() -> Mono.error(new IllegalStateException("No provider found for " + token.getClass()))))
      .flatMap(authentication -> onAuthenticationSuccess(authentication, webFilterExchange))
      .onErrorResume(AuthenticationException.class, e -> this.authenticationFailureHandler
          .onAuthenticationFailure(webFilterExchange, e));
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> updateSnapshot(InstanceEvent event) {
  return Mono.<Void>fromRunnable(() -> snapshots.compute(event.getInstance(), (key, old) -> {
    Instance instance = old != null ? old : Instance.create(key);
    return instance.apply(event);
  })).onErrorResume(ex -> {
    log.warn(
      "Error while updating the snapshot with event {}. Recomputing instance snapshot from event history.",
      event,
      ex
    );
    return recomputeSnapshot(event.getInstance());
  });
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
  return this.authorizationRequestResolver.resolve(exchange)
    .switchIfEmpty(chain.filter(exchange).then(Mono.empty()))
    .onErrorResume(ClientAuthorizationRequiredException.class, e -> {
      return this.requestCache.saveRequest(exchange)
        .then(this.authorizationRequestResolver.resolve(exchange, e.getClientRegistrationId()));
    })
    .flatMap(clientRegistration -> sendRedirectForAuthorization(exchange, clientRegistration));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeWithError() throws Exception {
  TestServerHttpResponse response = new TestServerHttpResponse();
  response.getHeaders().setContentLength(12);
  IllegalStateException error = new IllegalStateException("boo");
  response.writeWith(Flux.error(error)).onErrorResume(ex -> Mono.empty()).block();
  assertFalse(response.statusCodeWritten);
  assertFalse(response.headersWritten);
  assertFalse(response.cookiesWritten);
  assertFalse(response.getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH));
  assertTrue(response.body.isEmpty());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
  HandlerMethod handlerMethod = (HandlerMethod) handler;
  Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized");
  InitBinderBindingContext bindingContext = new InitBinderBindingContext(
      getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
  InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
  Function<Throwable, Mono<HandlerResult>> exceptionHandler =
      ex -> handleException(ex, handlerMethod, bindingContext, exchange);
  return this.modelInitializer
      .initModel(handlerMethod, bindingContext, exchange)
      .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
      .doOnNext(result -> result.setExceptionHandler(exceptionHandler))
      .doOnNext(result -> bindingContext.saveModel())
      .onErrorResume(exceptionHandler);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  if (this.forwardedHeaderTransformer != null) {
    request = this.forwardedHeaderTransformer.apply(request);
  }
  ServerWebExchange exchange = createExchange(request, response);
  LogFormatUtils.traceDebug(logger, traceOn ->
      exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
          (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
  return getDelegate().handle(exchange)
      .doOnSuccess(aVoid -> logResponse(exchange))
      .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
      .then(Mono.defer(response::setComplete));
}

相关文章

Mono类方法