reactor.core.publisher.Mono.onErrorMap()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.5k)|赞(0)|评价(0)|浏览(243)

本文整理了Java中reactor.core.publisher.Mono.onErrorMap()方法的一些代码示例,展示了Mono.onErrorMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onErrorMap()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onErrorMap

Mono.onErrorMap介绍

[英]Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.
[中]如果错误与给定类型匹配,则通过同步应用函数来转换此Mono发出的错误。否则,让错误通过。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform an error emitted by this {@link Mono} by synchronously applying a function
 * to it if the error matches the given type. Otherwise let the error pass through.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorMapWithClassPredicateForMono.svg" alt="">
 *
 * @param type the class of the exception type to react to
 * @param mapper the error transforming {@link Function}
 * @param <E> the error type
 *
 * @return a {@link Mono} that transforms some source errors to other errors
 */
public final <E extends Throwable> Mono<T> onErrorMap(Class<E> type,
    Function<? super E, ? extends Throwable> mapper) {
  @SuppressWarnings("unchecked")
  Function<Throwable, Throwable> handler = (Function<Throwable, Throwable>)mapper;
  return onErrorMap(type::isInstance, handler);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
  Mono<T> mono = body(BodyExtractors.toMono(typeReference));
  return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
      .onErrorMap(DecodingException.class, DECODING_MAPPER);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
  Mono<T> mono = body(BodyExtractors.toMono(elementClass));
  return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
      .onErrorMap(DecodingException.class, DECODING_MAPPER);
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Subscribe to {@code targets} using subscribe {@link Function} and register {@code targets} after subscription.
 * 
 * @param targets
 * @param subscribeFunction
 * @return
 */
Mono<Void> subscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> subscribeFunction) {
  return subscribeFunction.apply(targets).doOnSuccess((discard) -> this.targets.addAll(Arrays.asList(targets)))
      .onErrorMap(exceptionTranslator);
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<Authentication> authenticate(Authentication authentication) {
  return Mono.justOrEmpty(authentication)
      .filter(a -> a instanceof  BearerTokenAuthenticationToken)
      .cast(BearerTokenAuthenticationToken.class)
      .map(BearerTokenAuthenticationToken::getToken)
      .flatMap(this.jwtDecoder::decode)
      .flatMap(this.jwtAuthenticationConverter::convert)
      .cast(Authentication.class)
      .onErrorMap(JwtException.class, this::onError);
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Unsubscribe from to {@code targets} using unsubscribe {@link Function} and register {@code targets} after
 * subscription.
 * 
 * @param targets
 * @param unsubscribeFunction
 * @return
 */
Mono<Void> unsubscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> unsubscribeFunction) {
  return Mono.defer(() -> {
    List<ByteBuffer> targetCollection = Arrays.asList(targets);
    return unsubscribeFunction.apply(targets).doOnSuccess((discard) -> {
      this.targets.removeAll(targetCollection);
    }).onErrorMap(exceptionTranslator);
  });
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
  Mono<T> mono = body(BodyExtractors.toMono(typeReference));
  return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}

代码示例来源:origin: spring-projects/spring-security

private Mono<Jwt> decode(SignedJWT parsedToken) {
  try {
    JWKSelector selector = this.jwkSelectorFactory
        .createSelector(parsedToken.getHeader());
    return this.reactiveJwkSource.get(selector)
      .onErrorMap(e -> new IllegalStateException("Could not obtain the keys", e))
      .map(jwkList -> createClaimsSet(parsedToken, jwkList))
      .map(set -> createJwt(parsedToken, set))
      .map(this::validateJwt)
      .onErrorMap(e -> !(e instanceof IllegalStateException) && !(e instanceof JwtException), e -> new JwtException("An error occurred while attempting to decode the Jwt: ", e));
  } catch (RuntimeException ex) {
    throw new JwtException("An error occurred while attempting to decode the Jwt: " + ex.getMessage(), ex);
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
  Mono<T> mono = body(BodyExtractors.toMono(elementClass));
  return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}

代码示例来源:origin: spring-projects/spring-data-redis

private <T> Mono<T> doWithPubSub(Function<RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>, Mono<T>> function) {

    return connection.getPubSubConnection().flatMap(pubSubConnection -> function.apply(pubSubConnection.reactive()))
        .onErrorMap(connection.translateException());
  }
}

代码示例来源:origin: spring-projects/spring-data-redis

protected Mono<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> getPubSubConnection() {
  return pubSubConnection.getConnection().onErrorMap(translateException());
}

代码示例来源:origin: spring-projects/spring-data-redis

protected Mono<StatefulConnection<ByteBuffer, ByteBuffer>> getDedicatedConnection() {
  return dedicatedConnection.getConnection().onErrorMap(translateException());
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  Map<RedisURI, StatefulRedisConnection<K, V>> initialConnections = new HashMap<>();
  TopologyProvider topologyProvider = new StaticMasterSlaveTopologyProvider(redisClient, redisURIs);
  RedisURI seedNode = redisURIs.iterator().next();
  MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
  MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
      seedNode, initialConnections);
  return refresh.getNodes(seedNode).flatMap(nodes -> {
    if (nodes.isEmpty()) {
      return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURIs)));
    }
    return initializeConnection(codec, seedNode, connectionProvider, nodes);
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: spring-projects/spring-data-mongodb

/**
 * Create a reusable {@link Mono} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
 *
 * @param collectionName must not be empty or {@literal null}.
 * @param callback must not be {@literal null}.
 * @param <T>
 * @return a reusable {@link Mono} wrapping the {@link ReactiveCollectionCallback}.
 */
public <T> Mono<T> createMono(String collectionName, ReactiveCollectionCallback<T> callback) {
  Assert.hasText(collectionName, "Collection name must not be null or empty!");
  Assert.notNull(callback, "ReactiveCollectionCallback must not be null!");
  Mono<MongoCollection<Document>> collectionPublisher = Mono
      .fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
  return collectionPublisher.flatMap(collection -> Mono.from(callback.doInCollection(collection)))
      .onErrorMap(translateException());
}

代码示例来源:origin: lettuce-io/lettuce-core

private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {
  connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
  SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
  logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
  Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
      .fromCompletionStage(initializeChannelAsync(connectionBuilder));
  return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
      .doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
      .onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
}

代码示例来源:origin: spring-projects/spring-data-mongodb

/**
 * Create a reusable Mono for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
 * {@link Flux} or to reuse the {@link Flux}.
 *
 * @param callback must not be {@literal null}
 * @return a {@link Mono} wrapping the {@link ReactiveDatabaseCallback}.
 */
public <T> Mono<T> createMono(ReactiveDatabaseCallback<T> callback) {
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  return Mono.defer(() -> Mono.from(callback.doInDB(prepareDatabase(doGetDatabase()))))
      .onErrorMap(translateException());
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);
  SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
      redisURI.getSentinelMasterId(), redisURI.getSentinels());
  MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
  MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
      redisURI, Collections.emptyMap());
  Runnable runnable = getTopologyRefreshRunnable(refresh, connectionProvider);
  return refresh.getNodes(redisURI).flatMap(nodes -> {
    if (nodes.isEmpty()) {
      return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURI)));
    }
    return initializeConnection(codec, sentinelTopologyRefresh, connectionProvider, runnable, nodes);
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: spring-projects/spring-security

@Override
  public Mono<Authentication> authenticate(Authentication authentication) {
    return Mono.defer(() -> {
      OAuth2AuthorizationCodeAuthenticationToken token = (OAuth2AuthorizationCodeAuthenticationToken) authentication;

      // Section 3.1.2.1 Authentication Request - http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
      // scope REQUIRED. OpenID Connect requests MUST contain the "openid" scope value.
      if (token.getAuthorizationExchange()
          .getAuthorizationRequest().getScopes().contains("openid")) {
        // This is an OpenID Connect Authentication Request so return null
        // and let OidcAuthorizationCodeReactiveAuthenticationManager handle it instead once one is created
        // FIXME: Once we create OidcAuthorizationCodeReactiveAuthenticationManager uncomment below
//                return Mono.empty();
      }

      return this.authorizationCodeManager.authenticate(token)
          .onErrorMap(OAuth2AuthorizationException.class, e -> new OAuth2AuthenticationException(e.getError(), e.getError().toString()))
          .cast(OAuth2AuthorizationCodeAuthenticationToken.class)
          .flatMap(this::onSuccess);
    });
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void mapError() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.<Integer>error(new TestException())
      .onErrorMap(TestException.class, e -> new Exception("test"))
      .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isTrue())
        .then(() -> assertThat(mp.isSuccess()).isFalse())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorMap() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>error(new Exception()).onErrorMap(d -> new RuntimeException("forced" +
      " " +
      "failure"))
                    .subscribe(ts);
  ts.assertNoValues()
   .assertError()
   .assertErrorMessage("forced failure")
   .assertNotComplete();
}

相关文章

Mono类方法