reactor.core.publisher.Mono.delaySubscription()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(287)

本文整理了Java中reactor.core.publisher.Mono.delaySubscription()方法的一些代码示例,展示了Mono.delaySubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.delaySubscription()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:delaySubscription

Mono.delaySubscription介绍

[英]Delay the Mono#subscribe(Subscriber) to this Mono source until the given period elapses.
[中]延迟Mono#订阅(订户)到此Mono源,直到给定的时间段过去。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Delay the {@link Mono#subscribe(Subscriber) subscription} to this {@link Mono} source until the given
 * period elapses.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySubscriptionForMono.svg" alt="">
 *
 * @param delay duration before subscribing this {@link Mono}
 *
 * @return a delayed {@link Mono}
 *
 */
public final Mono<T> delaySubscription(Duration delay) {
  return delaySubscription(delay, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Delay the {@link Mono#subscribe(Subscriber) subscription} to this {@link Mono} source until the given
 * {@link Duration} elapses.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySubscriptionForMono.svg" alt="">
 *
 * @param delay {@link Duration} before subscribing this {@link Mono}
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a delayed {@link Mono}
 *
 */
public final Mono<T> delaySubscription(Duration delay, Scheduler timer) {
  return delaySubscription(Mono.delay(delay, timer));
}

代码示例来源:origin: reactor/reactor-core

Mono<Integer> scenario_delayedTrigger(){
  return Mono.just(1)
        .delaySubscription(Duration.ofSeconds(3));
}

代码示例来源:origin: reactor/reactor-core

Mono<Integer> scenario_delayedTrigger2(){
  return Mono.just(1)
        .delaySubscription(Duration.ofMillis(50));
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void otherNull() {
  Mono.never().delaySubscription((Publisher<?>)null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptySources() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono<String> empty1 = Mono.empty();
  Mono<String> empty2 = Mono.empty();
  Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
      .doOnCancel(() -> cancelled.set(true));
  Duration d = StepVerifier.create(Mono.zip(empty1, empty2, empty3))
        .verifyComplete();
  assertThat(cancelled).isTrue();
  assertThat(d).isLessThan(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void delayErrorEmptySources() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono<String> empty1 = Mono.empty();
  Mono<String> empty2 = Mono.empty();
  Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
      .doOnCancel(() -> cancelled.set(true));
  StepVerifier.create(Mono.zipDelayError(empty1, empty2, empty3))
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(400))
        .verifyComplete();
  assertThat(cancelled).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .delaySubscription(Mono.just(1))
    .subscribe(ts);
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyTrigger() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .delaySubscription(Mono.empty())
    .subscribe(ts);
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void manyTriggered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .delaySubscription(Flux.range(1, 10))
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void neverTriggered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .delaySubscription(Mono.never())
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyTriggerBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Mono.just(1)
    .delaySubscription(Mono.empty())
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Mono.just(1)
    .delaySubscription(Mono.just(1))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeNoEvent() {
  StepVerifier.withVirtualTime(() -> Mono.just("foo")
                      .delaySubscription(Duration.ofDays(2)))
        .expectSubscription()
        .expectNoEvent(Duration.ofDays(2))
        .expectNext("foo")
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void manyTriggeredBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Mono.just(1)
    .delaySubscription(Flux.range(1, 10))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeNoEventError() {
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.just("foo")
                      .delaySubscription(Duration.ofDays(2)))
        .expectSubscription()
        .expectNoEvent(Duration.ofDays(2))
        .expectNext("foo")
        .expectNoEvent(Duration.ofDays(2))
        .expectComplete()
        .verify())
      .withMessage("Unexpected completion during a no-event expectation");
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param subscriptionDelay
 * @return
 * @see reactor.core.publisher.Mono#delaySubscription(org.reactivestreams.Publisher)
 */
public final <U> Mono<T> delaySubscription(Publisher<U> subscriptionDelay) {
  return boxed.delaySubscription(subscriptionDelay);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param delay
 * @return
 * @see reactor.core.publisher.Mono#delaySubscription(java.time.Duration)
 */
public final Mono<T> delaySubscription(Duration delay) {
  return boxed.delaySubscription(delay);
}
/**

代码示例来源:origin: rsocket/rsocket-java

@Test
void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
 TestDuplexConnection conn = new TestDuplexConnection();
 RSocketClient rSocket =
   new RSocketClient(
     conn, DefaultPayload::create, err -> {}, StreamIdSupplier.clientSupplier());
 conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException("error")));
 StepVerifier.create(
     rSocket
       .requestResponse(DefaultPayload.create("test"))
       .delaySubscription(Duration.ofMillis(100)))
   .expectErrorMatches(
     err -> err instanceof RejectedSetupException && "error".equals(err.getMessage()))
   .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Delay the {@link Mono#subscribe(Subscriber) subscription} to this {@link Mono} source until the given
 * period elapses.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySubscriptionForMono.svg" alt="">
 *
 * @param delay duration before subscribing this {@link Mono}
 *
 * @return a delayed {@link Mono}
 *
 */
public final Mono<T> delaySubscription(Duration delay) {
  return delaySubscription(delay, Schedulers.parallel());
}

相关文章

Mono类方法