本文整理了Java中reactor.core.publisher.Mono.doOnError()
方法的一些代码示例,展示了Mono.doOnError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnError()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnError
[英]Add behavior triggered when the Mono completes with an error matching the given exception type.
[中]当Mono完成并出现与给定异常类型匹配的错误时,会触发添加行为。
代码示例来源:origin: spring-projects/spring-framework
public MonoToListenableFutureAdapter(Mono<T> mono) {
Assert.notNull(mono, "Mono must not be null");
this.processor = mono
.doOnSuccess(this.registry::success)
.doOnError(this.registry::failure)
.toProcessor();
}
代码示例来源:origin: org.springframework/spring-core
public MonoToListenableFutureAdapter(Mono<T> mono) {
Assert.notNull(mono, "Mono must not be null");
this.processor = mono
.doOnSuccess(this.registry::success)
.doOnError(this.registry::failure)
.toProcessor();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
Mono<Void> connectMono = this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: spring-projects/spring-framework
argMonos.add(this.resolvers.resolveArgument(parameter, bindingContext, exchange)
.defaultIfEmpty(NO_ARG_VALUE)
.doOnError(cause -> logArgumentErrorIfNecessary(exchange, parameter, cause)));
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Mono<Void> notify(InstanceEvent event) {
if (!enabled) {
return Mono.empty();
}
return repository.find(event.getInstance())
.filter(instance -> shouldNotify(event, instance))
.flatMap(instance -> doNotify(event, instance))
.doOnError(ex -> getLogger().error("Couldn't notify for event {} ", event, ex))
.then();
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Instance> doUpdateStatus(Instance instance) {
if (!instance.isRegistered()) {
return Mono.empty();
}
log.debug("Update status for {}", instance);
return instanceWebClient.instance(instance)
.get()
.uri(Endpoint.HEALTH)
.exchange()
.log(log.getName(), Level.FINEST)
.flatMap(this::convertStatusInfo)
.doOnError(ex -> logError(instance, ex))
.onErrorResume(this::handleError)
.map(instance::withStatusInfo);
}
代码示例来源:origin: lettuce-io/lettuce-core
private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, RedisCodec<K, V> codec,
DefaultEndpoint endpoint,
RedisChannelHandler<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
ConnectionFuture<T> future = connectStatefulAsync(connection, codec, endpoint, getFirstUri(), socketAddressSupplier,
commandHandlerSupplier);
return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: lettuce-io/lettuce-core
protected Mono<List<RedisNodeDescription>> getNodes(StatefulRedisSentinelConnection<String, String> connection) {
RedisSentinelReactiveCommands<String, String> reactive = connection.reactive();
Mono<Tuple2<Map<String, String>, List<Map<String, String>>>> masterAndSlaves = reactive.master(masterId)
.zipWith(reactive.slaves(masterId).collectList()).timeout(this.timeout).flatMap(tuple -> {
return ResumeAfter.close(connection).thenEmit(tuple);
}).doOnError(e -> connection.closeAsync());
return masterAndSlaves.map(tuple -> {
List<RedisNodeDescription> result = new ArrayList<>();
result.add(toNode(tuple.getT1(), RedisInstance.Role.MASTER));
result.addAll(tuple.getT2().stream().filter(SentinelTopologyProvider::isAvailable)
.map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));
return result;
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoRejectedDoOnError() {
Mono<String> mp = Mono.error(new Exception("test"));
AtomicReference<Throwable> ref = new AtomicReference<>();
mp.doOnError(ref::set)
.subscribe();
assertThat(ref.get()).hasMessage("test");
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
if (connector != null) {
builder = builder.clientConnector(connector);
}
String baseUrl = String.format("%s://%s:%d", this.scheme, socketAddress.getHostString(), socketAddress.getPort());
return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
}
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
return Flux.fromIterable(hosts()) //
.filter(entry -> state == null || entry.getState().equals(state)) //
.map(ElasticsearchHost::getEndpoint) //
.flatMap(host -> {
Mono<ClientResponse> exchange = createWebClient(host) //
.head().uri("/").exchange().doOnError(throwable -> {
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
clientProvider.getErrorListener().accept(throwable);
});
return Mono.just(host).zipWith(exchange);
}) //
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoRejectedDoOnErrorClazz() {
Mono<String> mp = Mono.error(new TestException());
AtomicReference<Throwable> ref = new AtomicReference<>();
mp.doOnError(TestException.class, ref::set)
.subscribe();
assertThat(ref.get()).isInstanceOf(TestException.class);
}
代码示例来源:origin: spring-projects/spring-framework
WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name);
return binder.bind(exchange)
.doOnError(bindingResultMono::onError)
.doOnSuccess(aVoid -> {
validateIfApplicable(binder, parameter);
代码示例来源:origin: reactor/reactor-core
@Test
public void monoCheckpointEmpty() {
StringWriter sw = new StringWriter();
Mono<Object> tested = Mono.just(1)
.map(i -> null)
.filter(Objects::nonNull)
.checkpoint()
.doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
StepVerifier.create(tested)
.verifyError();
String debugStack = sw.toString();
assertThat(debugStack).contains("Assembly trace from producer [reactor.core.publisher.MonoFilterFuseable] :");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoCheckpointWithDescriptionIsLight() {
StringWriter sw = new StringWriter();
Mono<Object> tested = Mono.just(1)
.map(i -> null)
.filter(Objects::nonNull)
.checkpoint("foo")
.doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
StepVerifier.create(tested)
.verifyError();
String debugStack = sw.toString();
assertThat(debugStack).contains("Assembly site of producer [reactor.core.publisher.MonoFilterFuseable] is identified by light checkpoint [foo].");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoCheckpointDescriptionAndForceStack() {
StringWriter sw = new StringWriter();
Mono<Object> tested = Mono.just(1)
.map(i -> null)
.filter(Objects::nonNull)
.checkpoint("foo", true)
.doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
StepVerifier.create(tested)
.verifyError();
String debugStack = sw.toString();
assertThat(debugStack).contains("Assembly trace from producer [reactor.core.publisher.MonoFilterFuseable], described as [foo] :");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoRejectedDoOnErrorClazzNot() {
Mono<String> mp = Mono.error(new TestException());
AtomicReference<Throwable> ref = new AtomicReference<>();
MonoProcessor<String> processor = mp.doOnError(RuntimeException.class, ref::set)
.toProcessor();
processor.subscribe();
assertThat(processor.getError()).isInstanceOf(TestException.class);
assertThat(ref.get()).isNull();
}
代码示例来源:origin: lettuce-io/lettuce-core
private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {
connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
.fromCompletionStage(initializeChannelAsync(connectionBuilder));
return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
.doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
.onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
}
内容来源于网络,如有侵权,请联系作者删除!