本文整理了Java中reactor.core.publisher.Mono.onErrorMap()
方法的一些代码示例,展示了Mono.onErrorMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onErrorMap()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onErrorMap
[英]Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.
[中]如果错误与给定类型匹配,则通过同步应用函数来转换此Mono发出的错误。否则,让错误通过。
代码示例来源:origin: reactor/reactor-core
/**
* Transform an error emitted by this {@link Mono} by synchronously applying a function
* to it if the error matches the given type. Otherwise let the error pass through.
* <p>
* <img class="marble" src="doc-files/marbles/onErrorMapWithClassPredicateForMono.svg" alt="">
*
* @param type the class of the exception type to react to
* @param mapper the error transforming {@link Function}
* @param <E> the error type
*
* @return a {@link Mono} that transforms some source errors to other errors
*/
public final <E extends Throwable> Mono<T> onErrorMap(Class<E> type,
Function<? super E, ? extends Throwable> mapper) {
@SuppressWarnings("unchecked")
Function<Throwable, Throwable> handler = (Function<Throwable, Throwable>)mapper;
return onErrorMap(type::isInstance, handler);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
Mono<T> mono = body(BodyExtractors.toMono(typeReference));
return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
.onErrorMap(DecodingException.class, DECODING_MAPPER);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
Mono<T> mono = body(BodyExtractors.toMono(elementClass));
return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
.onErrorMap(DecodingException.class, DECODING_MAPPER);
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* Subscribe to {@code targets} using subscribe {@link Function} and register {@code targets} after subscription.
*
* @param targets
* @param subscribeFunction
* @return
*/
Mono<Void> subscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> subscribeFunction) {
return subscribeFunction.apply(targets).doOnSuccess((discard) -> this.targets.addAll(Arrays.asList(targets)))
.onErrorMap(exceptionTranslator);
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
return Mono.justOrEmpty(authentication)
.filter(a -> a instanceof BearerTokenAuthenticationToken)
.cast(BearerTokenAuthenticationToken.class)
.map(BearerTokenAuthenticationToken::getToken)
.flatMap(this.jwtDecoder::decode)
.flatMap(this.jwtAuthenticationConverter::convert)
.cast(Authentication.class)
.onErrorMap(JwtException.class, this::onError);
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* Unsubscribe from to {@code targets} using unsubscribe {@link Function} and register {@code targets} after
* subscription.
*
* @param targets
* @param unsubscribeFunction
* @return
*/
Mono<Void> unsubscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> unsubscribeFunction) {
return Mono.defer(() -> {
List<ByteBuffer> targetCollection = Arrays.asList(targets);
return unsubscribeFunction.apply(targets).doOnSuccess((discard) -> {
this.targets.removeAll(targetCollection);
}).onErrorMap(exceptionTranslator);
});
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
Mono<T> mono = body(BodyExtractors.toMono(typeReference));
return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}
代码示例来源:origin: spring-projects/spring-security
private Mono<Jwt> decode(SignedJWT parsedToken) {
try {
JWKSelector selector = this.jwkSelectorFactory
.createSelector(parsedToken.getHeader());
return this.reactiveJwkSource.get(selector)
.onErrorMap(e -> new IllegalStateException("Could not obtain the keys", e))
.map(jwkList -> createClaimsSet(parsedToken, jwkList))
.map(set -> createJwt(parsedToken, set))
.map(this::validateJwt)
.onErrorMap(e -> !(e instanceof IllegalStateException) && !(e instanceof JwtException), e -> new JwtException("An error occurred while attempting to decode the Jwt: ", e));
} catch (RuntimeException ex) {
throw new JwtException("An error occurred while attempting to decode the Jwt: " + ex.getMessage(), ex);
}
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
Mono<T> mono = body(BodyExtractors.toMono(elementClass));
return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}
代码示例来源:origin: spring-projects/spring-data-redis
private <T> Mono<T> doWithPubSub(Function<RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>, Mono<T>> function) {
return connection.getPubSubConnection().flatMap(pubSubConnection -> function.apply(pubSubConnection.reactive()))
.onErrorMap(connection.translateException());
}
}
代码示例来源:origin: spring-projects/spring-data-redis
protected Mono<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> getPubSubConnection() {
return pubSubConnection.getConnection().onErrorMap(translateException());
}
代码示例来源:origin: spring-projects/spring-data-redis
protected Mono<StatefulConnection<ByteBuffer, ByteBuffer>> getDedicatedConnection() {
return dedicatedConnection.getConnection().onErrorMap(translateException());
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
Map<RedisURI, StatefulRedisConnection<K, V>> initialConnections = new HashMap<>();
TopologyProvider topologyProvider = new StaticMasterSlaveTopologyProvider(redisClient, redisURIs);
RedisURI seedNode = redisURIs.iterator().next();
MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
seedNode, initialConnections);
return refresh.getNodes(seedNode).flatMap(nodes -> {
if (nodes.isEmpty()) {
return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURIs)));
}
return initializeConnection(codec, seedNode, connectionProvider, nodes);
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: spring-projects/spring-data-mongodb
/**
* Create a reusable {@link Mono} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
*
* @param collectionName must not be empty or {@literal null}.
* @param callback must not be {@literal null}.
* @param <T>
* @return a reusable {@link Mono} wrapping the {@link ReactiveCollectionCallback}.
*/
public <T> Mono<T> createMono(String collectionName, ReactiveCollectionCallback<T> callback) {
Assert.hasText(collectionName, "Collection name must not be null or empty!");
Assert.notNull(callback, "ReactiveCollectionCallback must not be null!");
Mono<MongoCollection<Document>> collectionPublisher = Mono
.fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
return collectionPublisher.flatMap(collection -> Mono.from(callback.doInCollection(collection)))
.onErrorMap(translateException());
}
代码示例来源:origin: lettuce-io/lettuce-core
private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {
connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
.fromCompletionStage(initializeChannelAsync(connectionBuilder));
return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
.doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
.onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
}
代码示例来源:origin: spring-projects/spring-data-mongodb
/**
* Create a reusable Mono for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Mono} wrapping the {@link ReactiveDatabaseCallback}.
*/
public <T> Mono<T> createMono(ReactiveDatabaseCallback<T> callback) {
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
return Mono.defer(() -> Mono.from(callback.doInDB(prepareDatabase(doGetDatabase()))))
.onErrorMap(translateException());
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);
SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
redisURI.getSentinelMasterId(), redisURI.getSentinels());
MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
redisURI, Collections.emptyMap());
Runnable runnable = getTopologyRefreshRunnable(refresh, connectionProvider);
return refresh.getNodes(redisURI).flatMap(nodes -> {
if (nodes.isEmpty()) {
return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURI)));
}
return initializeConnection(codec, sentinelTopologyRefresh, connectionProvider, runnable, nodes);
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
return Mono.defer(() -> {
OAuth2AuthorizationCodeAuthenticationToken token = (OAuth2AuthorizationCodeAuthenticationToken) authentication;
// Section 3.1.2.1 Authentication Request - http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
// scope REQUIRED. OpenID Connect requests MUST contain the "openid" scope value.
if (token.getAuthorizationExchange()
.getAuthorizationRequest().getScopes().contains("openid")) {
// This is an OpenID Connect Authentication Request so return null
// and let OidcAuthorizationCodeReactiveAuthenticationManager handle it instead once one is created
// FIXME: Once we create OidcAuthorizationCodeReactiveAuthenticationManager uncomment below
// return Mono.empty();
}
return this.authorizationCodeManager.authenticate(token)
.onErrorMap(OAuth2AuthorizationException.class, e -> new OAuth2AuthenticationException(e.getError(), e.getError().toString()))
.cast(OAuth2AuthorizationCodeAuthenticationToken.class)
.flatMap(this::onSuccess);
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mapError() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorMap(TestException.class, e -> new Exception("test"))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isTrue())
.then(() -> assertThat(mp.isSuccess()).isFalse())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorMap() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>error(new Exception()).onErrorMap(d -> new RuntimeException("forced" +
" " +
"failure"))
.subscribe(ts);
ts.assertNoValues()
.assertError()
.assertErrorMessage("forced failure")
.assertNotComplete();
}
内容来源于网络,如有侵权,请联系作者删除!