reactor.core.publisher.Mono.cache()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(770)

本文整理了Java中reactor.core.publisher.Mono.cache()方法的一些代码示例,展示了Mono.cache()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.cache()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:cache

Mono.cache介绍

[英]Turn this Mono into a hot source and cache last emitted signals for further Subscriber. Completion and Error will also be replayed.
[中]将此单声道转换为热源,并缓存最后发出的信号,以供其他用户使用。完成和错误也将被重播。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Mono} into a hot source and cache last emitted signals for further
 * {@link Subscriber}, with an expiry timeout.
 * <p>
 * Completion and Error will also be replayed until {@code ttl} triggers in which case
 * the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
 *
 * @return a replaying {@link Mono}
 */
public final Mono<T> cache(Duration ttl) {
  return cache(ttl, Schedulers.parallel());
}

代码示例来源:origin: spring-projects/spring-framework

DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse response,
    WebSessionManager sessionManager, ServerCodecConfigurer codecConfigurer,
    LocaleContextResolver localeContextResolver, @Nullable ApplicationContext applicationContext) {
  Assert.notNull(request, "'request' is required");
  Assert.notNull(response, "'response' is required");
  Assert.notNull(sessionManager, "'sessionManager' is required");
  Assert.notNull(codecConfigurer, "'codecConfigurer' is required");
  Assert.notNull(localeContextResolver, "'localeContextResolver' is required");
  // Initialize before first call to getLogPrefix()
  this.attributes.put(ServerWebExchange.LOG_ID_ATTRIBUTE, request.getId());
  this.request = request;
  this.response = response;
  this.sessionMono = sessionManager.getSession(this).cache();
  this.localeContextResolver = localeContextResolver;
  this.formDataMono = initFormData(request, codecConfigurer, getLogPrefix());
  this.multipartDataMono = initMultipartData(request, codecConfigurer, getLogPrefix());
  this.applicationContext = applicationContext;
}

代码示例来源:origin: spring-projects/spring-data-mongodb

@Override
public ReactiveSessionScoped withSession(Publisher<ClientSession> sessionProvider) {
  Mono<ClientSession> cachedSession = Mono.from(sessionProvider).cache();
  return new ReactiveSessionScoped() {
    @Override
    public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {
      return cachedSession.flatMapMany(session -> {
        return ReactiveMongoTemplate.this.withSession(action, session) //
            .doFinally(signalType -> {
              doFinally.accept(session);
            });
      });
    }
  };
}

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
    List<HttpMessageReader<?>> readers) {
  try {
    MediaType contentType = request.getHeaders().getContentType();
    if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
      return ((HttpMessageReader<MultiValueMap<String, Part>>) readers.stream()
          .filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
          .findFirst()
          .orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
          .readMono(MULTIPART_DATA_TYPE, request, Hints.none())
          .switchIfEmpty(EMPTY_MULTIPART_DATA)
          .cache();
    }
  }
  catch (InvalidMediaTypeException ex) {
    // Ignore
  }
  return EMPTY_MULTIPART_DATA;
}
@Override

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, String>> initFormData(ServerHttpRequest request,
    List<HttpMessageReader<?>> readers) {
  try {
    MediaType contentType = request.getHeaders().getContentType();
    if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
      return ((HttpMessageReader<MultiValueMap<String, String>>) readers.stream()
          .filter(reader -> reader.canRead(FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
          .findFirst()
          .orElseThrow(() -> new IllegalStateException("No form data HttpMessageReader.")))
          .readMono(FORM_DATA_TYPE, request, Hints.none())
          .switchIfEmpty(EMPTY_FORM_DATA)
          .cache();
    }
  }
  catch (InvalidMediaTypeException ex) {
    // Ignore
  }
  return EMPTY_FORM_DATA;
}

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, String>> initFormData(ServerHttpRequest request,
    ServerCodecConfigurer configurer, String logPrefix) {
  try {
    MediaType contentType = request.getHeaders().getContentType();
    if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
      return ((HttpMessageReader<MultiValueMap<String, String>>) configurer.getReaders().stream()
          .filter(reader -> reader.canRead(FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
          .findFirst()
          .orElseThrow(() -> new IllegalStateException("No form data HttpMessageReader.")))
          .readMono(FORM_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
          .switchIfEmpty(EMPTY_FORM_DATA)
          .cache();
    }
  }
  catch (InvalidMediaTypeException ex) {
    // Ignore
  }
  return EMPTY_FORM_DATA;
}

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
    ServerCodecConfigurer configurer, String logPrefix) {
  try {
    MediaType contentType = request.getHeaders().getContentType();
    if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
      return ((HttpMessageReader<MultiValueMap<String, Part>>) configurer.getReaders().stream()
          .filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
          .findFirst()
          .orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
          .readMono(MULTIPART_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
          .switchIfEmpty(EMPTY_MULTIPART_DATA)
          .cache();
    }
  }
  catch (InvalidMediaTypeException ex) {
    // Ignore
  }
  return EMPTY_MULTIPART_DATA;
}

代码示例来源:origin: org.springframework/spring-web

DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse response,
    WebSessionManager sessionManager, ServerCodecConfigurer codecConfigurer,
    LocaleContextResolver localeContextResolver, @Nullable ApplicationContext applicationContext) {
  Assert.notNull(request, "'request' is required");
  Assert.notNull(response, "'response' is required");
  Assert.notNull(sessionManager, "'sessionManager' is required");
  Assert.notNull(codecConfigurer, "'codecConfigurer' is required");
  Assert.notNull(localeContextResolver, "'localeContextResolver' is required");
  // Initialize before first call to getLogPrefix()
  this.attributes.put(ServerWebExchange.LOG_ID_ATTRIBUTE, request.getId());
  this.request = request;
  this.response = response;
  this.sessionMono = sessionManager.getSession(this).cache();
  this.localeContextResolver = localeContextResolver;
  this.formDataMono = initFormData(request, codecConfigurer, getLogPrefix());
  this.multipartDataMono = initMultipartData(request, codecConfigurer, getLogPrefix());
  this.applicationContext = applicationContext;
}

代码示例来源:origin: spring-projects/spring-security

/**
 * Updates the cached JWK set from the configured URL.
 *
 * @return The updated JWK set.
 *
 * @throws RemoteKeySourceException If JWK retrieval failed.
 */
private Mono<JWKSet> getJWKSet() {
  return this.webClient.get()
      .uri(this.jwkSetURL)
      .retrieve()
      .bodyToMono(String.class)
      .map(this::parse)
      .doOnNext(jwkSet -> this.cachedJWKSet.set(Mono.just(jwkSet)))
      .cache();
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Collections.singletonList(scenario(f -> f.cache(Duration.ofMillis(100))));
}

代码示例来源:origin: org.springframework/spring-web

@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, String>> initFormData(ServerHttpRequest request,
    ServerCodecConfigurer configurer, String logPrefix) {
  try {
    MediaType contentType = request.getHeaders().getContentType();
    if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
      return ((HttpMessageReader<MultiValueMap<String, String>>) configurer.getReaders().stream()
          .filter(reader -> reader.canRead(FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
          .findFirst()
          .orElseThrow(() -> new IllegalStateException("No form data HttpMessageReader.")))
          .readMono(FORM_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
          .switchIfEmpty(EMPTY_FORM_DATA)
          .cache();
    }
  }
  catch (InvalidMediaTypeException ex) {
    // Ignore
  }
  return EMPTY_FORM_DATA;
}

代码示例来源:origin: org.springframework/spring-web

@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
    ServerCodecConfigurer configurer, String logPrefix) {
  try {
    MediaType contentType = request.getHeaders().getContentType();
    if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
      return ((HttpMessageReader<MultiValueMap<String, Part>>) configurer.getReaders().stream()
          .filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
          .findFirst()
          .orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
          .readMono(MULTIPART_DATA_TYPE, request, Hints.from(Hints.LOG_PREFIX_HINT, logPrefix))
          .switchIfEmpty(EMPTY_MULTIPART_DATA)
          .cache();
    }
  }
  catch (InvalidMediaTypeException ex) {
    // Ignore
  }
  return EMPTY_MULTIPART_DATA;
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoToProcessorChainColdToHot() {
  AtomicInteger subscriptionCount = new AtomicInteger();
  Mono<String> coldToHot = Mono.just("foo")
                 .doOnSubscribe(sub -> subscriptionCount.incrementAndGet())
                 .cache()
                 .toProcessor() //this actually subscribes
                 .filter(s -> s.length() < 4);
  assertThat(subscriptionCount.get()).isEqualTo(1);
  coldToHot.block();
  coldToHot.block();
  coldToHot.block();
  assertThat(subscriptionCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void raceSubscribeAndCache() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> source = Mono.fromCallable(count::getAndIncrement);
  for (int i = 0; i < 500; i++) {
    Mono<Integer> cached;
    if (i == 0) {
      cached = source.log().cache(Duration.ofSeconds(2));
    }
    else {
      cached = source.cache(Duration.ofSeconds(2));
    }
    RaceTestUtils.race(cached::subscribe, cached::subscribe);
  }
  assertThat(count.get()).isEqualTo(500);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void partialCancelDoesntCancelSource() {
  AtomicInteger cancelled = new AtomicInteger();
  Mono<Object> cached = Mono.never()
               .doOnCancel(cancelled::incrementAndGet)
               .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void totalCancelDoesntCancelSource() {
  AtomicInteger cancelled = new AtomicInteger();
  Mono<Object> cached = Mono.never()
               .doOnCancel(cancelled::incrementAndGet)
               .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  d2.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void doesntResubscribeNormal() {
  AtomicInteger subCount = new AtomicInteger();
  Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
  Mono<Integer> cached = source.cache(Duration.ofMillis(100))
                 .hide();
  StepVerifier.create(cached)
        .expectNoFusionSupport()
        .expectNext(1)
        .as("first subscription caches 1")
        .verifyComplete();
  StepVerifier.create(cached)
        .expectNext(1)
        .as("second subscription uses cache")
        .verifyComplete();
  assertThat(subCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void expireAfterTtlNormal() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  AtomicInteger subCount = new AtomicInteger();
  Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
  Mono<Integer> cached = source.cache(Duration.ofMillis(100), vts)
                 .hide();
  StepVerifier.create(cached)
        .expectNoFusionSupport()
        .expectNext(1)
        .as("first subscription caches 1")
        .verifyComplete();
  vts.advanceTimeBy(Duration.ofMillis(110));
  StepVerifier.create(cached)
        .expectNext(2)
        .as("cached value should expire")
        .verifyComplete();
  assertThat(subCount.get()).isEqualTo(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void doesntResubscribeConditional() {
  AtomicInteger subCount = new AtomicInteger();
  Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
  Mono<Integer> cached = source.cache(Duration.ofMillis(100))
      .hide()
      .filter(always -> true);
  StepVerifier.create(cached)
        .expectNoFusionSupport()
        .expectNext(1)
        .as("first subscription caches 1")
        .verifyComplete();
  StepVerifier.create(cached)
        .expectNext(1)
        .as("second subscription uses cache")
        .verifyComplete();
  assertThat(subCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void expireAfterTtlConditional() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  AtomicInteger subCount = new AtomicInteger();
  Mono<Integer> source = Mono.defer(() -> Mono.just(subCount.incrementAndGet()));
  Mono<Integer> cached = source.cache(Duration.ofMillis(100), vts)
      .hide()
      .filter(always -> true);
  StepVerifier.create(cached)
        .expectNoFusionSupport()
        .expectNext(1)
        .as("first subscription caches 1")
        .verifyComplete();
  vts.advanceTimeBy(Duration.ofMillis(110));
  StepVerifier.create(cached)
        .expectNext(2)
        .as("cached value should expire")
        .verifyComplete();
  assertThat(subCount.get()).isEqualTo(2);
}

相关文章

Mono类方法