reactor.core.publisher.Mono.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(815)

本文整理了Java中reactor.core.publisher.Mono.doOnError()方法的一些代码示例,展示了Mono.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnError()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnError

Mono.doOnError介绍

[英]Add behavior triggered when the Mono completes with an error matching the given exception type.
[中]当Mono完成并出现与给定异常类型匹配的错误时,会触发添加行为。

代码示例

代码示例来源:origin: spring-projects/spring-framework

public MonoToListenableFutureAdapter(Mono<T> mono) {
  Assert.notNull(mono, "Mono must not be null");
  this.processor = mono
      .doOnSuccess(this.registry::success)
      .doOnError(this.registry::failure)
      .toProcessor();
}

代码示例来源:origin: org.springframework/spring-core

public MonoToListenableFutureAdapter(Mono<T> mono) {
  Assert.notNull(mono, "Mono must not be null");
  this.processor = mono
      .doOnSuccess(this.registry::success)
      .doOnError(this.registry::failure)
      .toProcessor();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
  NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
  try {
    ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
    ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
    if (request.getMethod() == HttpMethod.HEAD) {
      response = new HttpHeadResponseDecorator(response);
    }
    return this.httpHandler.handle(request, response)
        .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
        .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
  }
  catch (URISyntaxException ex) {
    if (logger.isDebugEnabled()) {
      logger.debug("Failed to get request URI: " + ex.getMessage());
    }
    reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
    return Mono.empty();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  Mono<Void> connectMono = this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnError(handler::afterConnectFailure)
      .then();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: spring-projects/spring-framework

argMonos.add(this.resolvers.resolveArgument(parameter, bindingContext, exchange)
    .defaultIfEmpty(NO_ARG_VALUE)
    .doOnError(cause -> logArgumentErrorIfNecessary(exchange, parameter, cause)));

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Void> notify(InstanceEvent event) {
  if (!enabled) {
    return Mono.empty();
  }
  return repository.find(event.getInstance())
           .filter(instance -> shouldNotify(event, instance))
           .flatMap(instance -> doNotify(event, instance))
           .doOnError(ex -> getLogger().error("Couldn't notify for event {} ", event, ex))
           .then();
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Instance> doUpdateStatus(Instance instance) {
  if (!instance.isRegistered()) {
    return Mono.empty();
  }
  log.debug("Update status for {}", instance);
  return instanceWebClient.instance(instance)
              .get()
              .uri(Endpoint.HEALTH)
              .exchange()
              .log(log.getName(), Level.FINEST)
              .flatMap(this::convertStatusInfo)
              .doOnError(ex -> logError(instance, ex))
              .onErrorResume(this::handleError)
              .map(instance::withStatusInfo);
}

代码示例来源:origin: lettuce-io/lettuce-core

private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, RedisCodec<K, V> codec,
    DefaultEndpoint endpoint,
    RedisChannelHandler<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
  ConnectionFuture<T> future = connectStatefulAsync(connection, codec, endpoint, getFirstUri(), socketAddressSupplier,
      commandHandlerSupplier);
  return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: lettuce-io/lettuce-core

protected Mono<List<RedisNodeDescription>> getNodes(StatefulRedisSentinelConnection<String, String> connection) {
  RedisSentinelReactiveCommands<String, String> reactive = connection.reactive();
  Mono<Tuple2<Map<String, String>, List<Map<String, String>>>> masterAndSlaves = reactive.master(masterId)
      .zipWith(reactive.slaves(masterId).collectList()).timeout(this.timeout).flatMap(tuple -> {
        return ResumeAfter.close(connection).thenEmit(tuple);
      }).doOnError(e -> connection.closeAsync());
  return masterAndSlaves.map(tuple -> {
    List<RedisNodeDescription> result = new ArrayList<>();
    result.add(toNode(tuple.getT1(), RedisInstance.Role.MASTER));
    result.addAll(tuple.getT2().stream().filter(SentinelTopologyProvider::isAvailable)
        .map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));
    return result;
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoRejectedDoOnError() {
  Mono<String> mp = Mono.error(new Exception("test"));
  AtomicReference<Throwable> ref = new AtomicReference<>();
  mp.doOnError(ref::set)
   .subscribe();
  assertThat(ref.get()).hasMessage("test");
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {

    Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));

    if (connector != null) {
      builder = builder.clientConnector(connector);
    }

    String baseUrl = String.format("%s://%s:%d", this.scheme, socketAddress.getHostString(), socketAddress.getPort());
    return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
  }
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
  return Flux.fromIterable(hosts()) //
      .filter(entry -> state == null || entry.getState().equals(state)) //
      .map(ElasticsearchHost::getEndpoint) //
      .flatMap(host -> {
        Mono<ClientResponse> exchange = createWebClient(host) //
            .head().uri("/").exchange().doOnError(throwable -> {
              hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
              clientProvider.getErrorListener().accept(throwable);
            });
        return Mono.just(host).zipWith(exchange);
      }) //
      .onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoRejectedDoOnErrorClazz() {
  Mono<String> mp = Mono.error(new TestException());
  AtomicReference<Throwable> ref = new AtomicReference<>();
  mp.doOnError(TestException.class, ref::set)
   .subscribe();
  assertThat(ref.get()).isInstanceOf(TestException.class);
}

代码示例来源:origin: spring-projects/spring-framework

WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name);
return binder.bind(exchange)
    .doOnError(bindingResultMono::onError)
    .doOnSuccess(aVoid -> {
      validateIfApplicable(binder, parameter);

代码示例来源:origin: reactor/reactor-core

@Test
public void monoCheckpointEmpty() {
  StringWriter sw = new StringWriter();
  Mono<Object> tested = Mono.just(1)
               .map(i -> null)
               .filter(Objects::nonNull)
               .checkpoint()
               .doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
  StepVerifier.create(tested)
        .verifyError();
  String debugStack = sw.toString();
  assertThat(debugStack).contains("Assembly trace from producer [reactor.core.publisher.MonoFilterFuseable] :");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoCheckpointWithDescriptionIsLight() {
  StringWriter sw = new StringWriter();
  Mono<Object> tested = Mono.just(1)
               .map(i -> null)
               .filter(Objects::nonNull)
               .checkpoint("foo")
               .doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
  StepVerifier.create(tested)
        .verifyError();
  String debugStack = sw.toString();
  assertThat(debugStack).contains("Assembly site of producer [reactor.core.publisher.MonoFilterFuseable] is identified by light checkpoint [foo].");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoCheckpointDescriptionAndForceStack() {
  StringWriter sw = new StringWriter();
  Mono<Object> tested = Mono.just(1)
               .map(i -> null)
               .filter(Objects::nonNull)
               .checkpoint("foo", true)
               .doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
  StepVerifier.create(tested)
        .verifyError();
  String debugStack = sw.toString();
  assertThat(debugStack).contains("Assembly trace from producer [reactor.core.publisher.MonoFilterFuseable], described as [foo] :");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoRejectedDoOnErrorClazzNot() {
  Mono<String> mp = Mono.error(new TestException());
  AtomicReference<Throwable> ref = new AtomicReference<>();
  MonoProcessor<String> processor = mp.doOnError(RuntimeException.class, ref::set)
                    .toProcessor();
  processor.subscribe();
  assertThat(processor.getError()).isInstanceOf(TestException.class);
  assertThat(ref.get()).isNull();
}

代码示例来源:origin: lettuce-io/lettuce-core

private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {
  connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
  SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
  logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
  Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
      .fromCompletionStage(initializeChannelAsync(connectionBuilder));
  return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
      .doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
      .onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
}

相关文章

Mono类方法