reactor.core.publisher.Mono.delayUntil()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(516)

本文整理了Java中reactor.core.publisher.Mono.delayUntil()方法的一些代码示例,展示了Mono.delayUntil()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.delayUntil()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:delayUntil

Mono.delayUntil介绍

[英]Subscribe to this Mono and another Publisher that is generated from this Mono's element and which will be used as a trigger for relaying said element.

That is to say, the resulting Mono delays until this Mono's element is emitted, generates a trigger Publisher and then delays again until the trigger Publisher terminates.

Note that contiguous calls to all delayUntil are fused together. The triggers are generated and subscribed to in sequence, once the previous trigger completes. Error is propagated immediately downstream. In both cases, an error in the source is immediately propagated.
[中]订阅此Mono和从该Mono元素生成的另一个发布者,该发布者将用作转发所述元素的触发器。
也就是说,生成的Mono延迟直到该Mono的元素发出,生成触发器发布器,然后再次延迟,直到触发器发布器终止。
请注意,对所有delayUntil的连续调用都融合在一起。一旦上一个触发器完成,就会按顺序生成和订阅触发器。错误会立即向下游传播。在这两种情况下,源中的错误都会立即传播。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Subscribe to this {@link Flux} and generate a {@link Publisher} from each of this
 * Flux elements, each acting as a trigger for relaying said element.
 * <p>
 * That is to say, the resulting {@link Flux} delays each of its emission until the
 * associated trigger Publisher terminates.
 * <p>
 * In case of an error either in the source or in a trigger, that error is propagated
 * immediately downstream.
 * Note that unlike with the {@link Mono#delayUntil(Function) Mono variant} there is
 * no fusion of subsequent calls.
 * <p>
 * <img class="marble" src="doc-files/marbles/delayUntilForFlux.svg" alt="">
 *
 * @param triggerProvider a {@link Function} that maps each element into a
 * {@link Publisher} whose termination will trigger relaying the value.
 *
 * @return this Flux, but with elements delayed until their derived publisher terminates.
 */
public final Flux<T> delayUntil(Function<? super T, ? extends Publisher<?>> triggerProvider) {
  return concatMap(v -> Mono.just(v)
               .delayUntil(triggerProvider));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceHasSingleValueNotCancelled() {
  AtomicBoolean triggerCancelled = new AtomicBoolean();
  StepVerifier.create(Mono.just("foo")
              .delayUntil(
                  a -> Mono.just(1)
                       .doOnCancel(() -> triggerCancelled.set(true))))
        .expectNext("foo")
        .verifyComplete();
  assertThat(triggerCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sourceHasError() {
  StepVerifier.create(Mono.<String>error(new IllegalStateException("boom"))
      .delayUntil(a -> Mono.just("foo")))
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerHasError() {
  StepVerifier.create(Mono.just("foo")
              .delayUntil(a -> Mono.<String>error(new IllegalStateException("boom"))))
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceHasMultipleValuesNotCancelled() {
  AtomicBoolean triggerCancelled = new AtomicBoolean();
  StepVerifier.create(Mono.just("foo")
              .delayUntil(
                  a -> Flux.just(1, 2, 3).hide()
                       .doOnCancel(() -> triggerCancelled.set(true))))
        .expectNext("foo")
        .verifyComplete();
  assertThat(triggerCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testMonoEmptyAndPublisherVoid() {
  Publisher<Void> voidPublisher = Mono.fromRunnable(() -> { });
  StepVerifier.create(Mono.<String>empty().delayUntil(a -> voidPublisher))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sourceAndTriggerHaveErrorsNotDelayed() {
  StepVerifier.create(Mono.<String>error(new IllegalStateException("boom1"))
      .delayUntil(a -> Mono.<Integer>error(new IllegalStateException("boom2"))))
        .verifyErrorMessage("boom1");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testAPIDelayUntilErrorsImmediately() {
  IllegalArgumentException boom = new IllegalArgumentException("boom");
  StepVerifier.create(Mono.error(boom)
              .delayUntil(a -> Mono.delay(Duration.ofSeconds(2))))
        .expectErrorMessage("boom")
        .verify(Duration.ofMillis(200)); //at least, less than 2s
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceDoneFirst() {
  StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofSeconds(2))
                      .delayUntil(a -> Mono.just("foo")))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext(0L)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testAPIDelayUntil() {
  StepVerifier.withVirtualTime(() -> Mono.just("foo")
                      .delayUntil(a -> Mono.delay(Duration.ofSeconds(2))))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testAPIchainingCumulatesDelaysAfterValueGenerated() {
  AtomicInteger generator1Used = new AtomicInteger();
  AtomicInteger generator2Used = new AtomicInteger();
  Function<String, Mono<Long>> generator1 = a -> {
    generator1Used.incrementAndGet();
    return Mono.delay(Duration.ofMillis(400));
  };
  Function<Object, Mono<Long>> generator2 = a -> {
    generator2Used.incrementAndGet();
    return Mono.delay(Duration.ofMillis(800));
  };
  StepVerifier.withVirtualTime(() -> Mono.just("foo")
                 .delayElement(Duration.ofSeconds(3))
                 .delayUntil(generator1)
                 .delayUntil(generator2))
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(2900))
        .then(() -> assertThat(generator1Used.get()).isZero())
        .then(() -> assertThat(generator2Used.get()).isZero())
        .expectNoEvent(Duration.ofMillis(100))
        .then(() -> assertThat(generator1Used.get()).isEqualTo(1))
        .then(() -> assertThat(generator2Used.get()).isEqualTo(0))
        .expectNoEvent(Duration.ofMillis(400))
        .then(() -> assertThat(generator2Used.get()).isEqualTo(1))
        .expectNoEvent(Duration.ofMillis(800))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testMonoValuedAndPublisherVoid() {
  Publisher<Void> voidPublisher = Mono.fromRunnable(() -> { });
  StepVerifier.create(Mono.just("foo").delayUntil(a -> voidPublisher))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceWithDelays() {
  StepVerifier.withVirtualTime(() -> Mono.just("foo")
                      .delayUntil(a -> Flux.just(1, 2, 3)
                                .hide()
                                .delayElements(Duration.ofMillis(500))))
                  .expectSubscription()
                  .expectNoEvent(Duration.ofMillis(1400))
                  .thenAwait(Duration.ofMillis(100))
                  .expectNext("foo")
                  .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@SuppressWarnings("unchecked")
@Test
public void testAPIchainingCombines() {
  Mono<String> source = Mono.just("foo");
  Function<String, Flux<Integer>> generator1 = a -> Flux.just(1, 2, 3);
  Function<Object, Mono<Long>> generator2 = a -> Mono.delay(Duration.ofMillis(800));
  MonoDelayUntil<String> until1 = (MonoDelayUntil<String>) source.delayUntil(generator1);
  MonoDelayUntil<String> until2 = (MonoDelayUntil<String>) until1.delayUntil(generator2);
  assertThat(until1)
      .isNotSameAs(until2)
      ;
  assertThat(until1.source)
      .isSameAs(until2.source)
      ;
  assertThat(until1.otherGenerators).containsExactly(generator1);
  assertThat(until2.otherGenerators).containsExactly(generator1, generator2);
  StepVerifier.create(until2)
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(700))
        .thenAwait(Duration.ofMillis(100))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@Disabled(
   "NettyContext isDisposed() is not accurate\n"
     + "https://github.com/reactor/reactor-netty/issues/360")
 @DisplayName("disposes context")
 @Test
 void dispose() {
  channel
    .map(CloseableChannel::new)
    .delayUntil(
      closeable -> {
       closeable.dispose();
       return closeable.onClose().log();
      })
    .as(StepVerifier::create)
    .assertNext(closeable -> assertThat(closeable.isDisposed()).isTrue())
    .verifyComplete();
 }
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Subscribe to this {@link Flux} and generate a {@link Publisher} from each of this
 * Flux elements, each acting as a trigger for relaying said element.
 * <p>
 * That is to say, the resulting {@link Flux} delays each of its emission until the
 * associated trigger Publisher terminates.
 * <p>
 * In case of an error either in the source or in a trigger, that error is propagated
 * immediately downstream.
 * Note that unlike with the {@link Mono#delayUntil(Function) Mono variant} there is
 * no fusion of subsequent calls.
 * <p>
 * <img class="marble" src="doc-files/marbles/delayUntilForFlux.svg" alt="">
 *
 * @param triggerProvider a {@link Function} that maps each element into a
 * {@link Publisher} whose termination will trigger relaying the value.
 *
 * @return this Flux, but with elements delayed until their derived publisher terminates.
 */
public final Flux<T> delayUntil(Function<? super T, ? extends Publisher<?>> triggerProvider) {
  return concatMap(v -> Mono.just(v)
               .delayUntil(triggerProvider));
}

代码示例来源:origin: apache/servicemix-bundles

private Mono<CsrfToken> generateToken(ServerWebExchange exchange) {
  return this.csrfTokenRepository.generateToken(exchange)
    .delayUntil(token -> this.csrfTokenRepository.saveToken(exchange, token));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Mono<Void> delete(DeleteApplicationRequest request) {
  return Mono
    .zip(this.cloudFoundryClient, this.spaceId)
    .flatMap(function((cloudFoundryClient, spaceId) -> getRoutesAndApplicationId(cloudFoundryClient, request, spaceId, Optional.ofNullable(request.getDeleteRoutes()).orElse(false))
      .map(function((routes, applicationId) -> Tuples.of(cloudFoundryClient, routes, applicationId)))))
    .flatMap(function((cloudFoundryClient, routes, applicationId) -> deleteRoutes(cloudFoundryClient, request.getCompletionTimeout(), routes)
      .thenReturn(Tuples.of(cloudFoundryClient, applicationId))))
    .delayUntil(function(DefaultApplications::removeServiceBindings))
    .flatMap(function(DefaultApplications::requestDeleteApplication))
    .transform(OperationsLogging.log("Delete Application"))
    .checkpoint();
}

代码示例来源:origin: com.vmware.card-connectors/core-test

@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
  int phase = phaser.getPhase();
  phaser.register();
  return reactorClientHttpConnector.connect(method, uri, requestCallback)
      .doOnSuccessOrError((response, throwable) -> phaser.arriveAndDeregister())
      .delayUntil(response -> awaitAdvance(phase));
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void disableChunkImplicitDefault() {
  ConnectionProvider p = ConnectionProvider.fixed("test", 1);
  HttpClient client =
      HttpClient.create(p)
           .tcpConfiguration(tcpClient -> tcpClient.host("google.com"))
           .wiretap(true)
           .chunkedTransfer(false);
  Tuple2<HttpResponseStatus, Channel> r =
      client.get()
         .uri("/unsupportedURI")
         .responseConnection((res, conn) -> Mono.just(res.status())
                             .delayUntil(s -> conn.inbound().receive())
                             .zipWith(Mono.just(conn.channel())))
         .blockLast(Duration.ofSeconds(30));
  assertThat(r).isNotNull();
  Channel r2 =
      client.get()
         .uri("/unsupportedURI")
         .responseConnection((res, conn) -> Mono.just(conn.channel())
                             .delayUntil(s -> conn.inbound().receive()))
         .blockLast(Duration.ofSeconds(30));
  assertThat(r2).isNotNull();
  Assert.assertSame(r.getT2(), r2);
  Assert.assertEquals(r.getT1(), HttpResponseStatus.NOT_FOUND);
  p.dispose();
}

相关文章

Mono类方法