reactor.core.publisher.Mono.thenReturn()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(823)

本文整理了Java中reactor.core.publisher.Mono.thenReturn()方法的一些代码示例,展示了Mono.thenReturn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.thenReturn()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:thenReturn

Mono.thenReturn介绍

[英]Let this Mono complete then emit the provided value.
[中]让这个单声道完成,然后发出提供的值。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
  // Ensure the body is drained, even if the StatusHandler didn't consume it,
  // but ignore exception, in case the handler did consume.
  return (Mono<T>) response.bodyToMono(Void.class)
      .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}

代码示例来源:origin: spring-projects/spring-security

private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
  OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
      clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
  return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
      .thenReturn(authorizedClient);
}

代码示例来源:origin: spring-projects/spring-security

private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
  OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
      clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
  return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
      .thenReturn(authorizedClient);
}

代码示例来源:origin: spring-projects/spring-security

private Mono<OAuth2AuthorizedClient> authorizeWithClientCredentials(ClientRegistration clientRegistration, OAuth2AuthorizedClientResolver.Request request) {
  Authentication authentication = request.getAuthentication();
  ServerWebExchange exchange = request.getExchange();
  return this.authorizedClientResolver.clientCredentials(clientRegistration, authentication, exchange).
      flatMap(result -> this.authorizedClientRepository.saveAuthorizedClient(result, authentication, exchange)
          .thenReturn(result));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void onStatusWithBodyConsumed() {
  RuntimeException ex = new RuntimeException("response error");
  testOnStatus(ex, response -> response.bodyToMono(Void.class).thenReturn(ex));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void thenReturn() {
  StepVerifier.create(Mono.just(0).thenReturn(2))
        .expectNext(2)
        .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-security

private Mono<OAuth2AuthorizedClient> authorizeWithRefreshToken(ExchangeFunction next,
                                OAuth2AuthorizedClient authorizedClient,
                                OAuth2AuthorizedClientResolver.Request r) {
  ServerWebExchange exchange = r.getExchange();
  Authentication authentication = r.getAuthentication();
  ClientRegistration clientRegistration = authorizedClient
      .getClientRegistration();
  String tokenUri = clientRegistration
      .getProviderDetails().getTokenUri();
  ClientRequest refreshRequest = ClientRequest.create(HttpMethod.POST, URI.create(tokenUri))
      .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
      .headers(headers -> headers.setBasicAuth(clientRegistration.getClientId(), clientRegistration.getClientSecret()))
      .body(refreshTokenBody(authorizedClient.getRefreshToken().getTokenValue()))
      .build();
  return next.exchange(refreshRequest)
      .flatMap(refreshResponse -> refreshResponse.body(oauth2AccessTokenResponse()))
      .map(accessTokenResponse -> new OAuth2AuthorizedClient(authorizedClient.getClientRegistration(), authorizedClient.getPrincipalName(), accessTokenResponse.getAccessToken(), accessTokenResponse.getRefreshToken()))
      .flatMap(result -> this.authorizedClientRepository.saveAuthorizedClient(result, authentication, exchange)
          .thenReturn(result));
}

代码示例来源:origin: reactor/reactor-core

@Test
@Ignore
//FIXME this case of doubly-nested schedules is still not fully fixed
public void gh783_withInnerFlatmap() {
  int size = 61;
  Scheduler parallel = Schedulers.newParallel("gh-783");
  StepVerifier.withVirtualTime(() -> Flux.range(1, 10)
                      .take(size)
                      .subscribeOn(parallel)
                      .flatMap(message -> {
                        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
                        return interval.flatMap( tick -> Mono.delay(Duration.ofMillis(500))
                                          .thenReturn(message)
                                          .subscribeOn(parallel))
                               .subscribeOn(parallel);
                      }, 1,30)
                      .take(size)
                      .collectList()
  )
        .thenAwait(Duration.ofMillis(1500 * (size + 10)))
        .consumeNextWith(list -> assertThat(list).hasSize(size))
        .expectComplete()
        .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: rsocket/rsocket-java

errorConsumer);
 return connection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
});

代码示例来源:origin: org.springframework.data/spring-data-cassandra

@Override
public <S extends T> Mono<S> save(S entity) {
  Assert.notNull(entity, "Entity must not be null");
  return operations.insert(entity, INSERT_NULLS).thenReturn(entity);
}

代码示例来源:origin: org.springframework.data/spring-data-cassandra

@Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
  Assert.notNull(entityStream, "The given Publisher of entities must not be null");
  return Flux.from(entityStream)
      .flatMap(entity -> operations.insert(entity, INSERT_NULLS).thenReturn(entity));
}

代码示例来源:origin: apache/servicemix-bundles

private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
  OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
      clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
  return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
      .thenReturn(authorizedClient);
}

代码示例来源:origin: apache/servicemix-bundles

private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
  OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
      clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
  return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
      .thenReturn(authorizedClient);
}

代码示例来源:origin: org.springframework.data/spring-data-r2dbc

@Override
public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
  return Flux.usingWhen(beginTransaction().thenReturn(this), callback, //
      DefaultTransactionalDatabaseClient::commitTransaction, //
      DefaultTransactionalDatabaseClient::rollbackTransaction, //
      DefaultTransactionalDatabaseClient::rollbackTransaction) //
      .subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}

代码示例来源:origin: spring-projects/spring-data-r2dbc

@Override
public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
  return Flux.usingWhen(beginTransaction().thenReturn(this), callback, //
      DefaultTransactionalDatabaseClient::commitTransaction, //
      DefaultTransactionalDatabaseClient::rollbackTransaction, //
      DefaultTransactionalDatabaseClient::rollbackTransaction) //
      .subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Mono<Optional<String>> checkVisibility(CloudFoundryClient cloudFoundryClient, String organizationId, ServicePlanResource resource) {
  String servicePlanId = ResourceUtils.getId(resource);
  if (resource.getEntity().getPubliclyVisible()) {
    return Mono.just(Optional.of(servicePlanId));
  }
  return requestListServicePlanVisibilities(cloudFoundryClient, organizationId, servicePlanId)
    .next()
    .switchIfEmpty(ExceptionUtils.illegalArgument("Service Plan %s is not visible to your organization", resource.getEntity().getName()))
    .thenReturn(Optional.of(servicePlanId));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Mono<Void> delete(DeleteApplicationRequest request) {
  return Mono
    .zip(this.cloudFoundryClient, this.spaceId)
    .flatMap(function((cloudFoundryClient, spaceId) -> getRoutesAndApplicationId(cloudFoundryClient, request, spaceId, Optional.ofNullable(request.getDeleteRoutes()).orElse(false))
      .map(function((routes, applicationId) -> Tuples.of(cloudFoundryClient, routes, applicationId)))))
    .flatMap(function((cloudFoundryClient, routes, applicationId) -> deleteRoutes(cloudFoundryClient, request.getCompletionTimeout(), routes)
      .thenReturn(Tuples.of(cloudFoundryClient, applicationId))))
    .delayUntil(function(DefaultApplications::removeServiceBindings))
    .flatMap(function(DefaultApplications::requestDeleteApplication))
    .transform(OperationsLogging.log("Delete Application"))
    .checkpoint();
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig config) {
 return Mono.defer(
   () -> {
    this.serviceRegistry = config.serviceRegistry();
    this.endpoint = config.endpoint();
    ClusterConfig clusterConfig =
      clusterConfigBuilder(config).addMetadata(getMetadata()).build();
    LOGGER.info("Start scalecube service discovery with config: {}", clusterConfig);
    return Cluster.join(clusterConfig)
      .doOnSuccess(cluster -> this.cluster = cluster)
      .doOnSuccess(this::listen)
      .thenReturn(this);
   });
}

代码示例来源:origin: io.scalecube/scalecube-services-discovery

@Override
public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig config) {
 return Mono.defer(
   () -> {
    this.serviceRegistry = config.serviceRegistry();
    this.endpoint = config.endpoint();
    ClusterConfig clusterConfig =
      clusterConfigBuilder(config).addMetadata(getMetadata()).build();
    LOGGER.info("Start scalecube service discovery with config: {}", clusterConfig);
    return Cluster.join(clusterConfig)
      .doOnSuccess(cluster -> this.cluster = cluster)
      .doOnSuccess(this::listen)
      .thenReturn(this);
   });
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Flux<Void> pushDocker(CloudFoundryClient cloudFoundryClient, List<DomainSummary> availableDomains, ApplicationManifest manifest, RandomWords randomWords,
                   PushApplicationManifestRequest request, String spaceId) {
  return getOptionalStackId(cloudFoundryClient, manifest.getStack())
    .flatMapMany(stackId -> getApplicationId(cloudFoundryClient, manifest, spaceId, stackId.orElse(null)))
    .flatMap(applicationId -> Mono.zip(
      Mono.just(applicationId),
      getApplicationRoutes(cloudFoundryClient, applicationId)
    ))
    .flatMap(function((applicationId, existingRoutes) -> prepareDomainsAndRoutes(cloudFoundryClient, applicationId, availableDomains, manifest, existingRoutes, randomWords, spaceId)
      .thenReturn(applicationId)))
    .delayUntil(applicationId -> bindServices(cloudFoundryClient, applicationId, manifest, spaceId))
    .flatMap(applicationId -> stopAndStartApplication(cloudFoundryClient, applicationId, manifest.getName(), request));
}

相关文章

Mono类方法