本文整理了Java中reactor.core.publisher.Mono.thenReturn()
方法的一些代码示例,展示了Mono.thenReturn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.thenReturn()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:thenReturn
[英]Let this Mono complete then emit the provided value.
[中]让这个单声道完成,然后发出提供的值。
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
// but ignore exception, in case the handler did consume.
return (Mono<T>) response.bodyToMono(Void.class)
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
代码示例来源:origin: spring-projects/spring-security
private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
.thenReturn(authorizedClient);
}
代码示例来源:origin: spring-projects/spring-security
private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
.thenReturn(authorizedClient);
}
代码示例来源:origin: spring-projects/spring-security
private Mono<OAuth2AuthorizedClient> authorizeWithClientCredentials(ClientRegistration clientRegistration, OAuth2AuthorizedClientResolver.Request request) {
Authentication authentication = request.getAuthentication();
ServerWebExchange exchange = request.getExchange();
return this.authorizedClientResolver.clientCredentials(clientRegistration, authentication, exchange).
flatMap(result -> this.authorizedClientRepository.saveAuthorizedClient(result, authentication, exchange)
.thenReturn(result));
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void onStatusWithBodyConsumed() {
RuntimeException ex = new RuntimeException("response error");
testOnStatus(ex, response -> response.bodyToMono(Void.class).thenReturn(ex));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void thenReturn() {
StepVerifier.create(Mono.just(0).thenReturn(2))
.expectNext(2)
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-security
private Mono<OAuth2AuthorizedClient> authorizeWithRefreshToken(ExchangeFunction next,
OAuth2AuthorizedClient authorizedClient,
OAuth2AuthorizedClientResolver.Request r) {
ServerWebExchange exchange = r.getExchange();
Authentication authentication = r.getAuthentication();
ClientRegistration clientRegistration = authorizedClient
.getClientRegistration();
String tokenUri = clientRegistration
.getProviderDetails().getTokenUri();
ClientRequest refreshRequest = ClientRequest.create(HttpMethod.POST, URI.create(tokenUri))
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.headers(headers -> headers.setBasicAuth(clientRegistration.getClientId(), clientRegistration.getClientSecret()))
.body(refreshTokenBody(authorizedClient.getRefreshToken().getTokenValue()))
.build();
return next.exchange(refreshRequest)
.flatMap(refreshResponse -> refreshResponse.body(oauth2AccessTokenResponse()))
.map(accessTokenResponse -> new OAuth2AuthorizedClient(authorizedClient.getClientRegistration(), authorizedClient.getPrincipalName(), accessTokenResponse.getAccessToken(), accessTokenResponse.getRefreshToken()))
.flatMap(result -> this.authorizedClientRepository.saveAuthorizedClient(result, authentication, exchange)
.thenReturn(result));
}
代码示例来源:origin: reactor/reactor-core
@Test
@Ignore
//FIXME this case of doubly-nested schedules is still not fully fixed
public void gh783_withInnerFlatmap() {
int size = 61;
Scheduler parallel = Schedulers.newParallel("gh-783");
StepVerifier.withVirtualTime(() -> Flux.range(1, 10)
.take(size)
.subscribeOn(parallel)
.flatMap(message -> {
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
return interval.flatMap( tick -> Mono.delay(Duration.ofMillis(500))
.thenReturn(message)
.subscribeOn(parallel))
.subscribeOn(parallel);
}, 1,30)
.take(size)
.collectList()
)
.thenAwait(Duration.ofMillis(1500 * (size + 10)))
.consumeNextWith(list -> assertThat(list).hasSize(size))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: rsocket/rsocket-java
errorConsumer);
return connection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
});
代码示例来源:origin: org.springframework.data/spring-data-cassandra
@Override
public <S extends T> Mono<S> save(S entity) {
Assert.notNull(entity, "Entity must not be null");
return operations.insert(entity, INSERT_NULLS).thenReturn(entity);
}
代码示例来源:origin: org.springframework.data/spring-data-cassandra
@Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
return Flux.from(entityStream)
.flatMap(entity -> operations.insert(entity, INSERT_NULLS).thenReturn(entity));
}
代码示例来源:origin: apache/servicemix-bundles
private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
.thenReturn(authorizedClient);
}
代码示例来源:origin: apache/servicemix-bundles
private Mono<OAuth2AuthorizedClient> clientCredentialsResponse(ClientRegistration clientRegistration, Authentication authentication, ServerWebExchange exchange, OAuth2AccessTokenResponse tokenResponse) {
OAuth2AuthorizedClient authorizedClient = new OAuth2AuthorizedClient(
clientRegistration, authentication.getName(), tokenResponse.getAccessToken());
return this.authorizedClientRepository.saveAuthorizedClient(authorizedClient, authentication, exchange)
.thenReturn(authorizedClient);
}
代码示例来源:origin: org.springframework.data/spring-data-r2dbc
@Override
public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
return Flux.usingWhen(beginTransaction().thenReturn(this), callback, //
DefaultTransactionalDatabaseClient::commitTransaction, //
DefaultTransactionalDatabaseClient::rollbackTransaction, //
DefaultTransactionalDatabaseClient::rollbackTransaction) //
.subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}
代码示例来源:origin: spring-projects/spring-data-r2dbc
@Override
public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
return Flux.usingWhen(beginTransaction().thenReturn(this), callback, //
DefaultTransactionalDatabaseClient::commitTransaction, //
DefaultTransactionalDatabaseClient::rollbackTransaction, //
DefaultTransactionalDatabaseClient::rollbackTransaction) //
.subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Mono<Optional<String>> checkVisibility(CloudFoundryClient cloudFoundryClient, String organizationId, ServicePlanResource resource) {
String servicePlanId = ResourceUtils.getId(resource);
if (resource.getEntity().getPubliclyVisible()) {
return Mono.just(Optional.of(servicePlanId));
}
return requestListServicePlanVisibilities(cloudFoundryClient, organizationId, servicePlanId)
.next()
.switchIfEmpty(ExceptionUtils.illegalArgument("Service Plan %s is not visible to your organization", resource.getEntity().getName()))
.thenReturn(Optional.of(servicePlanId));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
@Override
public Mono<Void> delete(DeleteApplicationRequest request) {
return Mono
.zip(this.cloudFoundryClient, this.spaceId)
.flatMap(function((cloudFoundryClient, spaceId) -> getRoutesAndApplicationId(cloudFoundryClient, request, spaceId, Optional.ofNullable(request.getDeleteRoutes()).orElse(false))
.map(function((routes, applicationId) -> Tuples.of(cloudFoundryClient, routes, applicationId)))))
.flatMap(function((cloudFoundryClient, routes, applicationId) -> deleteRoutes(cloudFoundryClient, request.getCompletionTimeout(), routes)
.thenReturn(Tuples.of(cloudFoundryClient, applicationId))))
.delayUntil(function(DefaultApplications::removeServiceBindings))
.flatMap(function(DefaultApplications::requestDeleteApplication))
.transform(OperationsLogging.log("Delete Application"))
.checkpoint();
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig config) {
return Mono.defer(
() -> {
this.serviceRegistry = config.serviceRegistry();
this.endpoint = config.endpoint();
ClusterConfig clusterConfig =
clusterConfigBuilder(config).addMetadata(getMetadata()).build();
LOGGER.info("Start scalecube service discovery with config: {}", clusterConfig);
return Cluster.join(clusterConfig)
.doOnSuccess(cluster -> this.cluster = cluster)
.doOnSuccess(this::listen)
.thenReturn(this);
});
}
代码示例来源:origin: io.scalecube/scalecube-services-discovery
@Override
public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig config) {
return Mono.defer(
() -> {
this.serviceRegistry = config.serviceRegistry();
this.endpoint = config.endpoint();
ClusterConfig clusterConfig =
clusterConfigBuilder(config).addMetadata(getMetadata()).build();
LOGGER.info("Start scalecube service discovery with config: {}", clusterConfig);
return Cluster.join(clusterConfig)
.doOnSuccess(cluster -> this.cluster = cluster)
.doOnSuccess(this::listen)
.thenReturn(this);
});
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Flux<Void> pushDocker(CloudFoundryClient cloudFoundryClient, List<DomainSummary> availableDomains, ApplicationManifest manifest, RandomWords randomWords,
PushApplicationManifestRequest request, String spaceId) {
return getOptionalStackId(cloudFoundryClient, manifest.getStack())
.flatMapMany(stackId -> getApplicationId(cloudFoundryClient, manifest, spaceId, stackId.orElse(null)))
.flatMap(applicationId -> Mono.zip(
Mono.just(applicationId),
getApplicationRoutes(cloudFoundryClient, applicationId)
))
.flatMap(function((applicationId, existingRoutes) -> prepareDomainsAndRoutes(cloudFoundryClient, applicationId, availableDomains, manifest, existingRoutes, randomWords, spaceId)
.thenReturn(applicationId)))
.delayUntil(applicationId -> bindServices(cloudFoundryClient, applicationId, manifest, spaceId))
.flatMap(applicationId -> stopAndStartApplication(cloudFoundryClient, applicationId, manifest.getName(), request));
}
内容来源于网络,如有侵权,请联系作者删除!