reactor.core.publisher.Mono.doOnSuccess()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(1141)

本文整理了Java中reactor.core.publisher.Mono.doOnSuccess()方法的一些代码示例,展示了Mono.doOnSuccess()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnSuccess()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnSuccess

Mono.doOnSuccess介绍

[英]Add behavior triggered when the Mono completes successfully.

  • null : completed without data
  • T: completed with data
    [中]添加Mono成功完成时触发的行为。
    *空:无数据完成
    *T:有数据吗

代码示例

代码示例来源:origin: spring-projects/spring-framework

public MonoToListenableFutureAdapter(Mono<T> mono) {
  Assert.notNull(mono, "Mono must not be null");
  this.processor = mono
      .doOnSuccess(this.registry::success)
      .doOnError(this.registry::failure)
      .toProcessor();
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Subscribe to {@code targets} using subscribe {@link Function} and register {@code targets} after subscription.
 * 
 * @param targets
 * @param subscribeFunction
 * @return
 */
Mono<Void> subscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> subscribeFunction) {
  return subscribeFunction.apply(targets).doOnSuccess((discard) -> this.targets.addAll(Arrays.asList(targets)))
      .onErrorMap(exceptionTranslator);
}

代码示例来源:origin: org.springframework/spring-core

public MonoToListenableFutureAdapter(Mono<T> mono) {
  Assert.notNull(mono, "Mono must not be null");
  this.processor = mono
      .doOnSuccess(this.registry::success)
      .doOnError(this.registry::failure)
      .toProcessor();
}

代码示例来源:origin: spring-projects/spring-data-mongodb

public Mono<Void> dropCollection(String collectionName) {
  return createMono(collectionName, MongoCollection::drop).doOnSuccess(success -> {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Dropped collection [" + collectionName + "]");
    }
  }).then();
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<Void> removeAuthorizedClient(String clientRegistrationId, Authentication principal,
    ServerWebExchange exchange) {
  Assert.hasText(clientRegistrationId, "clientRegistrationId cannot be empty");
  Assert.notNull(exchange, "exchange cannot be null");
  return exchange.getSession()
    .doOnSuccess(session -> {
      Map<String, OAuth2AuthorizedClient> authorizedClients = getAuthorizedClients(session);
      authorizedClients.remove(clientRegistrationId);
      if (authorizedClients.isEmpty()) {
        session.getAttributes().remove(this.sessionAttributeName);
      } else {
        session.getAttributes().put(this.sessionAttributeName, authorizedClients);
      }
    })
    .then(Mono.empty());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
  NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
  try {
    ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
    ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
    if (request.getMethod() == HttpMethod.HEAD) {
      response = new HttpHeadResponseDecorator(response);
    }
    return this.httpHandler.handle(request, response)
        .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
        .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
  }
  catch (URISyntaxException ex) {
    if (logger.isDebugEnabled()) {
      logger.debug("Failed to get request URI: " + ex.getMessage());
    }
    reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
    return Mono.empty();
  }
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<Void> saveAuthorizedClient(OAuth2AuthorizedClient authorizedClient, Authentication principal,
    ServerWebExchange exchange) {
  Assert.notNull(authorizedClient, "authorizedClient cannot be null");
  Assert.notNull(exchange, "exchange cannot be null");
  return exchange.getSession()
    .doOnSuccess(session -> {
      Map<String, OAuth2AuthorizedClient> authorizedClients = getAuthorizedClients(session);
      authorizedClients.put(authorizedClient.getClientRegistration().getRegistrationId(), authorizedClient);
      session.getAttributes().put(this.sessionAttributeName, authorizedClients);
    })
    .then(Mono.empty());
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> sendReminders() {
  Instant now = Instant.now();
  return Flux.fromIterable(this.reminders.values())
        .filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
        .flatMap(reminder -> delegate.notify(reminder.getEvent())
                      .doOnSuccess(signal -> reminder.setLastNotification(now)))
        .then();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoSuccessDoOnSuccess() {
  Mono<String> mp = Mono.just("test");
  AtomicReference<String> ref = new AtomicReference<>();
  mp.doOnSuccess(ref::set)
   .subscribe();
  assertThat(ref.get()).isEqualToIgnoringCase("test");
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void onMonoSuccessNullDoOnSuccess() {
  Mono<String> mp = Mono.just("test");
  mp.doOnSuccess(null)
   .subscribe();
}

代码示例来源:origin: spring-projects/spring-framework

return binder.bind(exchange)
    .doOnError(bindingResultMono::onError)
    .doOnSuccess(aVoid -> {
      validateIfApplicable(binder, parameter);
      BindingResult errors = binder.getBindingResult();

代码示例来源:origin: spring-projects/spring-framework

bindingContext.setSessionContext(sessionAttributesHandler, session);
return invokeModelAttributeMethods(bindingContext, modelMethods, exchange)
    .doOnSuccess(aVoid ->
      findModelAttributes(handlerMethod, sessionAttributesHandler).forEach(name -> {
        if (!bindingContext.getModel().containsAttribute(name)) {

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessCallbackFailureInterruptsOnNext() {
  LongAdder invoked = new LongAdder();
  StepVerifier.create(Mono.just("foo")
              .doOnSuccess(s -> {
                invoked.increment();
                throw new IllegalArgumentException(s);
              }))
        .expectErrorMessage("foo")
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
  return next.exchange(withClientCookies(request))
    .doOnSuccess( response -> {
      response.cookies().values().forEach( cookies -> {
        cookies.forEach( cookie -> {
          if (cookie.getMaxAge().isZero()) {
            this.cookies.remove(cookie.getName());
          } else {
            this.cookies.put(cookie.getName(), cookie);
          }
        });
      });
    });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessForEmpty() {
  LongAdder invoked = new LongAdder();
  AtomicReference<String> value = new AtomicReference<>();
  StepVerifier.create(Mono.<String>empty()
      .doOnSuccess(v -> {
        invoked.increment();
        value.set(v);
      }))
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
  assertEquals(null, value.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessNotCalledOnError() {
  LongAdder invoked = new LongAdder();
  IllegalArgumentException err = new IllegalArgumentException("boom");
  StepVerifier.create(Mono.error(err)
      .doOnSuccess(v -> invoked.increment()))
        .expectErrorMessage("boom")
        .verify();
  assertEquals(0, invoked.intValue());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  if (this.forwardedHeaderTransformer != null) {
    request = this.forwardedHeaderTransformer.apply(request);
  }
  ServerWebExchange exchange = createExchange(request, response);
  LogFormatUtils.traceDebug(logger, traceOn ->
      exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
          (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
  return getDelegate().handle(exchange)
      .doOnSuccess(aVoid -> logResponse(exchange))
      .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
      .then(Mono.defer(response::setComplete));
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void testErrorWithDoOnSuccess() {
    assertThatExceptionOfType(RuntimeException.class)
        .isThrownBy(() ->
            Mono.error(new NullPointerException("boom"))
              .doOnSuccess(aValue -> {})
              .subscribe())
        .withCauseInstanceOf(NullPointerException.class)
        .matches(Exceptions::isErrorCallbackNotImplemented, "ErrorCallbackNotImplemented");
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
  if (exchange.getRequest().getQueryParams().containsKey("expire")) {
    return exchange.getSession().doOnNext(session -> {
      // Don't do anything, leave it expired...
    }).then();
  }
  else if (exchange.getRequest().getQueryParams().containsKey("changeId")) {
    return exchange.getSession().flatMap(session ->
        session.changeSessionId().doOnSuccess(aVoid -> updateSessionAttribute(session)));
  }
  else if (exchange.getRequest().getQueryParams().containsKey("invalidate")) {
    return exchange.getSession().doOnNext(WebSession::invalidate).then();
  }
  else {
    return exchange.getSession().doOnSuccess(this::updateSessionAttribute).then();
  }
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  if (this.forwardedHeaderTransformer != null) {
    request = this.forwardedHeaderTransformer.apply(request);
  }
  ServerWebExchange exchange = createExchange(request, response);
  LogFormatUtils.traceDebug(logger, traceOn ->
      exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
          (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
  return getDelegate().handle(exchange)
      .doOnSuccess(aVoid -> logResponse(exchange))
      .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
      .then(Mono.defer(response::setComplete));
}

相关文章

Mono类方法