本文整理了Java中reactor.core.publisher.Mono.cache()
方法的一些代码示例,展示了Mono.cache()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.cache()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:cache
[英]Turn this Mono into a hot source and cache last emitted signals for further Subscriber. Completion and Error will also be replayed.
[中]将此单声道转换为热源,并缓存最后发出的信号,以供其他用户使用。完成和错误也将被重播。
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Mono} into a hot source and cache last emitted signals for further
* {@link Subscriber}, with an expiry timeout.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
*
* @return a replaying {@link Mono}
*/
public final Mono<T> cache(Duration ttl) {
return cache(ttl, Schedulers.parallel());
}
代码示例来源:origin: spring-projects/spring-framework
DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse response,
WebSessionManager sessionManager, ServerCodecConfigurer codecConfigurer,
LocaleContextResolver localeContextResolver, @Nullable ApplicationContext applicationContext) {
Assert.notNull(request, "'request' is required");
Assert.notNull(response, "'response' is required");
Assert.notNull(sessionManager, "'sessionManager' is required");
Assert.notNull(codecConfigurer, "'codecConfigurer' is required");
Assert.notNull(localeContextResolver, "'localeContextResolver' is required");
// Initialize before first call to getLogPrefix()
this.attributes.put(ServerWebExchange.LOG_ID_ATTRIBUTE, request.getId());
this.request = request;
this.response = response;
this.sessionMono = sessionManager.getSession(this).cache();
this.localeContextResolver = localeContextResolver;
this.formDataMono = initFormData(request, codecConfigurer, getLogPrefix());
this.multipartDataMono = initMultipartData(request, codecConfigurer, getLogPrefix());
this.applicationContext = applicationContext;
}
代码示例来源:origin: spring-projects/spring-data-mongodb
@Override
public ReactiveSessionScoped withSession(Publisher<ClientSession> sessionProvider) {
Mono<ClientSession> cachedSession = Mono.from(sessionProvider).cache();
return new ReactiveSessionScoped() {
@Override
public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {
return cachedSession.flatMapMany(session -> {
return ReactiveMongoTemplate.this.withSession(action, session) //
.doFinally(signalType -> {
doFinally.accept(session);
});
});
}
};
}
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
List<HttpMessageReader<?>> readers) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, Part>>) readers.stream()
.filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
.readMono(MULTIPART_DATA_TYPE, request, Hints.none())
.switchIfEmpty(EMPTY_MULTIPART_DATA)
.cache();
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_MULTIPART_DATA;
}
@Override
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, String>> initFormData(ServerHttpRequest request,
List<HttpMessageReader<?>> readers) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, String>>) readers.stream()
.filter(reader -> reader.canRead(FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No form data HttpMessageReader.")))
.readMono(FORM_DATA_TYPE, request, Hints.none())
.switchIfEmpty(EMPTY_FORM_DATA)
.cache();
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_FORM_DATA;
}
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, String>> initFormData(ServerHttpRequest request,
ServerCodecConfigurer configurer, String logPrefix) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, String>>) configurer.getReaders().stream()
.filter(reader -> reader.canRead(FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No form data HttpMessageReader.")))
.readMono(FORM_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
.switchIfEmpty(EMPTY_FORM_DATA)
.cache();
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_FORM_DATA;
}
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
ServerCodecConfigurer configurer, String logPrefix) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, Part>>) configurer.getReaders().stream()
.filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
.readMono(MULTIPART_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
.switchIfEmpty(EMPTY_MULTIPART_DATA)
.cache();
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_MULTIPART_DATA;
}
代码示例来源:origin: org.springframework/spring-web
DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse response,
WebSessionManager sessionManager, ServerCodecConfigurer codecConfigurer,
LocaleContextResolver localeContextResolver, @Nullable ApplicationContext applicationContext) {
Assert.notNull(request, "'request' is required");
Assert.notNull(response, "'response' is required");
Assert.notNull(sessionManager, "'sessionManager' is required");
Assert.notNull(codecConfigurer, "'codecConfigurer' is required");
Assert.notNull(localeContextResolver, "'localeContextResolver' is required");
// Initialize before first call to getLogPrefix()
this.attributes.put(ServerWebExchange.LOG_ID_ATTRIBUTE, request.getId());
this.request = request;
this.response = response;
this.sessionMono = sessionManager.getSession(this).cache();
this.localeContextResolver = localeContextResolver;
this.formDataMono = initFormData(request, codecConfigurer, getLogPrefix());
this.multipartDataMono = initMultipartData(request, codecConfigurer, getLogPrefix());
this.applicationContext = applicationContext;
}
代码示例来源:origin: spring-projects/spring-security
/**
* Updates the cached JWK set from the configured URL.
*
* @return The updated JWK set.
*
* @throws RemoteKeySourceException If JWK retrieval failed.
*/
private Mono<JWKSet> getJWKSet() {
return this.webClient.get()
.uri(this.jwkSetURL)
.retrieve()
.bodyToMono(String.class)
.map(this::parse)
.doOnNext(jwkSet -> this.cachedJWKSet.set(Mono.just(jwkSet)))
.cache();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Collections.singletonList(scenario(f -> f.cache(Duration.ofMillis(100))));
}
代码示例来源:origin: org.springframework/spring-web
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, String>> initFormData(ServerHttpRequest request,
ServerCodecConfigurer configurer, String logPrefix) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, String>>) configurer.getReaders().stream()
.filter(reader -> reader.canRead(FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No form data HttpMessageReader.")))
.readMono(FORM_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
.switchIfEmpty(EMPTY_FORM_DATA)
.cache();
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_FORM_DATA;
}
代码示例来源:origin: org.springframework/spring-web
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
ServerCodecConfigurer configurer, String logPrefix) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, Part>>) configurer.getReaders().stream()
.filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
.readMono(MULTIPART_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
.switchIfEmpty(EMPTY_MULTIPART_DATA)
.cache();
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_MULTIPART_DATA;
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoToProcessorChainColdToHot() {
AtomicInteger subscriptionCount = new AtomicInteger();
Mono<String> coldToHot = Mono.just("foo")
.doOnSubscribe(sub -> subscriptionCount.incrementAndGet())
.cache()
.toProcessor() //this actually subscribes
.filter(s -> s.length() < 4);
assertThat(subscriptionCount.get()).isEqualTo(1);
coldToHot.block();
coldToHot.block();
coldToHot.block();
assertThat(subscriptionCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void raceSubscribeAndCache() {
AtomicInteger count = new AtomicInteger();
Mono<Integer> source = Mono.fromCallable(count::getAndIncrement);
for (int i = 0; i < 500; i++) {
Mono<Integer> cached;
if (i == 0) {
cached = source.log().cache(Duration.ofSeconds(2));
}
else {
cached = source.cache(Duration.ofSeconds(2));
}
RaceTestUtils.race(cached::subscribe, cached::subscribe);
}
assertThat(count.get()).isEqualTo(500);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void partialCancelDoesntCancelSource() {
AtomicInteger cancelled = new AtomicInteger();
Mono<Object> cached = Mono.never()
.doOnCancel(cancelled::incrementAndGet)
.cache(Duration.ofMillis(200));
Disposable d1 = cached.subscribe();
Disposable d2 = cached.subscribe();
d1.dispose();
assertThat(cancelled.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void totalCancelDoesntCancelSource() {
AtomicInteger cancelled = new AtomicInteger();
Mono<Object> cached = Mono.never()
.doOnCancel(cancelled::incrementAndGet)
.cache(Duration.ofMillis(200));
Disposable d1 = cached.subscribe();
Disposable d2 = cached.subscribe();
d1.dispose();
d2.dispose();
assertThat(cancelled.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void doesntResubscribeNormal() {
AtomicInteger subCount = new AtomicInteger();
Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
Mono<Integer> cached = source.cache(Duration.ofMillis(100))
.hide();
StepVerifier.create(cached)
.expectNoFusionSupport()
.expectNext(1)
.as("first subscription caches 1")
.verifyComplete();
StepVerifier.create(cached)
.expectNext(1)
.as("second subscription uses cache")
.verifyComplete();
assertThat(subCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expireAfterTtlNormal() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
AtomicInteger subCount = new AtomicInteger();
Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
Mono<Integer> cached = source.cache(Duration.ofMillis(100), vts)
.hide();
StepVerifier.create(cached)
.expectNoFusionSupport()
.expectNext(1)
.as("first subscription caches 1")
.verifyComplete();
vts.advanceTimeBy(Duration.ofMillis(110));
StepVerifier.create(cached)
.expectNext(2)
.as("cached value should expire")
.verifyComplete();
assertThat(subCount.get()).isEqualTo(2);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void doesntResubscribeConditional() {
AtomicInteger subCount = new AtomicInteger();
Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
Mono<Integer> cached = source.cache(Duration.ofMillis(100))
.hide()
.filter(always -> true);
StepVerifier.create(cached)
.expectNoFusionSupport()
.expectNext(1)
.as("first subscription caches 1")
.verifyComplete();
StepVerifier.create(cached)
.expectNext(1)
.as("second subscription uses cache")
.verifyComplete();
assertThat(subCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expireAfterTtlConditional() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
AtomicInteger subCount = new AtomicInteger();
Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
Mono<Integer> cached = source.cache(Duration.ofMillis(100), vts)
.hide()
.filter(always -> true);
StepVerifier.create(cached)
.expectNoFusionSupport()
.expectNext(1)
.as("first subscription caches 1")
.verifyComplete();
vts.advanceTimeBy(Duration.ofMillis(110));
StepVerifier.create(cached)
.expectNext(2)
.as("cached value should expire")
.verifyComplete();
assertThat(subCount.get()).isEqualTo(2);
}
内容来源于网络,如有侵权,请联系作者删除!