本文整理了Java中reactor.core.publisher.Mono.doOnSuccess()
方法的一些代码示例,展示了Mono.doOnSuccess()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnSuccess()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnSuccess
[英]Add behavior triggered when the Mono completes successfully.
代码示例来源:origin: spring-projects/spring-framework
public MonoToListenableFutureAdapter(Mono<T> mono) {
Assert.notNull(mono, "Mono must not be null");
this.processor = mono
.doOnSuccess(this.registry::success)
.doOnError(this.registry::failure)
.toProcessor();
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* Subscribe to {@code targets} using subscribe {@link Function} and register {@code targets} after subscription.
*
* @param targets
* @param subscribeFunction
* @return
*/
Mono<Void> subscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> subscribeFunction) {
return subscribeFunction.apply(targets).doOnSuccess((discard) -> this.targets.addAll(Arrays.asList(targets)))
.onErrorMap(exceptionTranslator);
}
代码示例来源:origin: org.springframework/spring-core
public MonoToListenableFutureAdapter(Mono<T> mono) {
Assert.notNull(mono, "Mono must not be null");
this.processor = mono
.doOnSuccess(this.registry::success)
.doOnError(this.registry::failure)
.toProcessor();
}
代码示例来源:origin: spring-projects/spring-data-mongodb
public Mono<Void> dropCollection(String collectionName) {
return createMono(collectionName, MongoCollection::drop).doOnSuccess(success -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped collection [" + collectionName + "]");
}
}).then();
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Void> removeAuthorizedClient(String clientRegistrationId, Authentication principal,
ServerWebExchange exchange) {
Assert.hasText(clientRegistrationId, "clientRegistrationId cannot be empty");
Assert.notNull(exchange, "exchange cannot be null");
return exchange.getSession()
.doOnSuccess(session -> {
Map<String, OAuth2AuthorizedClient> authorizedClients = getAuthorizedClients(session);
authorizedClients.remove(clientRegistrationId);
if (authorizedClients.isEmpty()) {
session.getAttributes().remove(this.sessionAttributeName);
} else {
session.getAttributes().put(this.sessionAttributeName, authorizedClients);
}
})
.then(Mono.empty());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Void> saveAuthorizedClient(OAuth2AuthorizedClient authorizedClient, Authentication principal,
ServerWebExchange exchange) {
Assert.notNull(authorizedClient, "authorizedClient cannot be null");
Assert.notNull(exchange, "exchange cannot be null");
return exchange.getSession()
.doOnSuccess(session -> {
Map<String, OAuth2AuthorizedClient> authorizedClients = getAuthorizedClients(session);
authorizedClients.put(authorizedClient.getClientRegistration().getRegistrationId(), authorizedClient);
session.getAttributes().put(this.sessionAttributeName, authorizedClients);
})
.then(Mono.empty());
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Void> sendReminders() {
Instant now = Instant.now();
return Flux.fromIterable(this.reminders.values())
.filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
.flatMap(reminder -> delegate.notify(reminder.getEvent())
.doOnSuccess(signal -> reminder.setLastNotification(now)))
.then();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoSuccessDoOnSuccess() {
Mono<String> mp = Mono.just("test");
AtomicReference<String> ref = new AtomicReference<>();
mp.doOnSuccess(ref::set)
.subscribe();
assertThat(ref.get()).isEqualToIgnoringCase("test");
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void onMonoSuccessNullDoOnSuccess() {
Mono<String> mp = Mono.just("test");
mp.doOnSuccess(null)
.subscribe();
}
代码示例来源:origin: spring-projects/spring-framework
return binder.bind(exchange)
.doOnError(bindingResultMono::onError)
.doOnSuccess(aVoid -> {
validateIfApplicable(binder, parameter);
BindingResult errors = binder.getBindingResult();
代码示例来源:origin: spring-projects/spring-framework
bindingContext.setSessionContext(sessionAttributesHandler, session);
return invokeModelAttributeMethods(bindingContext, modelMethods, exchange)
.doOnSuccess(aVoid ->
findModelAttributes(handlerMethod, sessionAttributesHandler).forEach(name -> {
if (!bindingContext.getModel().containsAttribute(name)) {
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessCallbackFailureInterruptsOnNext() {
LongAdder invoked = new LongAdder();
StepVerifier.create(Mono.just("foo")
.doOnSuccess(s -> {
invoked.increment();
throw new IllegalArgumentException(s);
}))
.expectErrorMessage("foo")
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return next.exchange(withClientCookies(request))
.doOnSuccess( response -> {
response.cookies().values().forEach( cookies -> {
cookies.forEach( cookie -> {
if (cookie.getMaxAge().isZero()) {
this.cookies.remove(cookie.getName());
} else {
this.cookies.put(cookie.getName(), cookie);
}
});
});
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessForEmpty() {
LongAdder invoked = new LongAdder();
AtomicReference<String> value = new AtomicReference<>();
StepVerifier.create(Mono.<String>empty()
.doOnSuccess(v -> {
invoked.increment();
value.set(v);
}))
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
assertEquals(null, value.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onSuccessNotCalledOnError() {
LongAdder invoked = new LongAdder();
IllegalArgumentException err = new IllegalArgumentException("boom");
StepVerifier.create(Mono.error(err)
.doOnSuccess(v -> invoked.increment()))
.expectErrorMessage("boom")
.verify();
assertEquals(0, invoked.intValue());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testErrorWithDoOnSuccess() {
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() ->
Mono.error(new NullPointerException("boom"))
.doOnSuccess(aValue -> {})
.subscribe())
.withCauseInstanceOf(NullPointerException.class)
.matches(Exceptions::isErrorCallbackNotImplemented, "ErrorCallbackNotImplemented");
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (exchange.getRequest().getQueryParams().containsKey("expire")) {
return exchange.getSession().doOnNext(session -> {
// Don't do anything, leave it expired...
}).then();
}
else if (exchange.getRequest().getQueryParams().containsKey("changeId")) {
return exchange.getSession().flatMap(session ->
session.changeSessionId().doOnSuccess(aVoid -> updateSessionAttribute(session)));
}
else if (exchange.getRequest().getQueryParams().containsKey("invalidate")) {
return exchange.getSession().doOnNext(WebSession::invalidate).then();
}
else {
return exchange.getSession().doOnSuccess(this::updateSessionAttribute).then();
}
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
内容来源于网络,如有侵权,请联系作者删除!