reactor.core.publisher.Mono.never()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(376)

本文整理了Java中reactor.core.publisher.Mono.never()方法的一些代码示例,展示了Mono.never()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.never()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:never

Mono.never介绍

[英]Return a Mono that will never signal any data, error or completion signal, essentially running indefinitely.
[中]返回一个永远不会发出任何数据、错误或完成信号的单声道,基本上是无限期运行的。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void timesInvalid() {
  Mono.never()
    .repeat(-1);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void predicateNull() {
  Mono.never()
    .repeat(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void wasRequestedMono() {
  PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
  AtomicReference<Subscription> sub = new AtomicReference<>();
  probe.mono().subscribe(null, null, null, sub::set);
  assertThat(probe.wasRequested()).isFalse();
  sub.get().request(3L);
  assertThat(probe.wasRequested()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void partialCancelDoesntCancelSource() {
  AtomicInteger cancelled = new AtomicInteger();
  Mono<Object> cached = Mono.never()
               .doOnCancel(cancelled::incrementAndGet)
               .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void totalCancelDoesntCancelSource() {
  AtomicInteger cancelled = new AtomicInteger();
  Mono<Object> cached = Mono.never()
               .doOnCancel(cancelled::incrementAndGet)
               .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  d2.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noSignalRealTime() {
  Duration verifyDuration = StepVerifier.create(Mono.never())
                     .expectSubscription()
                     .expectNoEvent(Duration.ofSeconds(1))
                     .thenCancel()
                     .verify(Duration.ofMillis(1100));
  assertThat(verifyDuration.toMillis()).isGreaterThanOrEqualTo(1000L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void wasCancelledMono() {
  PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
  Disposable d = probe.mono().subscribe();
  assertThat(probe.wasCancelled()).isFalse();
  d.dispose();
  assertThat(probe.wasCancelled()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .bufferWhen(Flux.just(1), u -> Mono.never()))
      .thenAwait(Duration.ofMillis(100))
      .thenCancel()
      .verifyThenAssertThat()
      .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, List<String>>> scenarios_operatorSuccess() {
  return Arrays.asList(scenario(f -> f.buffer(Mono.never()))
          .receive(i -> assertThat(i).containsExactly(item(0), item(1), item(2)))
      .shouldAssertPostTerminateState(false),
      scenario(f -> f.buffer(Mono.just(1)))
          .receiverEmpty()
          .shouldAssertPostTerminateState(false)
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void neverTriggered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .delaySubscription(Mono.never())
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnError() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnNextWhenNoBuffers() {
  StepVerifier.create(Flux.just(1, 2, 3)
              //buffer don't open in time
              .bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .buffer(4))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnError() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .buffer(Mono.never()))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .bufferUntil(i -> i > 10))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancelOverlap() {
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6)
              .limitRequest(2)
              .concatWith(Mono.never())
              .buffer(4, 2))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void timeoutDurationMessageDefault() {
  StepVerifier.withVirtualTime(() -> Mono.never()
                      .timeout(Duration.ofHours(1)))
        .thenAwait(Duration.ofHours(2))
        .expectErrorMessage("Did not observe any item or terminal signal within " +
            "3600000ms in 'source(MonoNever)' (and no fallback has been " +
            "configured)")
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void assertWasNotCancelledMono() {
  PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
  Disposable d = probe.mono().subscribe();
  probe.assertWasNotCancelled();
  d.dispose();
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(probe::assertWasNotCancelled)
      .withMessage("PublisherProbe should not have been cancelled but it was");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void assertWasCancelledMono() {
  PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
  Disposable d = probe.mono().subscribe();
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(probe::assertWasCancelled)
      .withMessage("PublisherProbe should have been cancelled but it wasn't");
  d.dispose();
  probe.assertWasCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void switchOnNextDynamicallyOnNext() {
  UnicastProcessor<Flux<Integer>> up = UnicastProcessor.create();
  up.onNext(Flux.range(1, 3));
  up.onNext(Flux.range(2, 3).concatWith(Mono.never()));
  up.onNext(Flux.range(4, 3));
  up.onComplete();
  StepVerifier.create(Flux.switchOnNext(up))
        .expectNext(1, 2, 3, 2, 3, 4, 4, 5, 6)
        .verifyComplete();
}

相关文章

Mono类方法