本文整理了Java中reactor.core.publisher.Mono.never()
方法的一些代码示例,展示了Mono.never()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.never()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:never
[英]Return a Mono that will never signal any data, error or completion signal, essentially running indefinitely.
[中]返回一个永远不会发出任何数据、错误或完成信号的单声道,基本上是无限期运行的。
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void timesInvalid() {
Mono.never()
.repeat(-1);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void predicateNull() {
Mono.never()
.repeat(null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void wasRequestedMono() {
PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
AtomicReference<Subscription> sub = new AtomicReference<>();
probe.mono().subscribe(null, null, null, sub::set);
assertThat(probe.wasRequested()).isFalse();
sub.get().request(3L);
assertThat(probe.wasRequested()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void partialCancelDoesntCancelSource() {
AtomicInteger cancelled = new AtomicInteger();
Mono<Object> cached = Mono.never()
.doOnCancel(cancelled::incrementAndGet)
.cache(Duration.ofMillis(200));
Disposable d1 = cached.subscribe();
Disposable d2 = cached.subscribe();
d1.dispose();
assertThat(cancelled.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void totalCancelDoesntCancelSource() {
AtomicInteger cancelled = new AtomicInteger();
Mono<Object> cached = Mono.never()
.doOnCancel(cancelled::incrementAndGet)
.cache(Duration.ofMillis(200));
Disposable d1 = cached.subscribe();
Disposable d2 = cached.subscribe();
d1.dispose();
d2.dispose();
assertThat(cancelled.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noSignalRealTime() {
Duration verifyDuration = StepVerifier.create(Mono.never())
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.thenCancel()
.verify(Duration.ofMillis(1100));
assertThat(verifyDuration.toMillis()).isGreaterThanOrEqualTo(1000L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void wasCancelledMono() {
PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
Disposable d = probe.mono().subscribe();
assertThat(probe.wasCancelled()).isFalse();
d.dispose();
assertThat(probe.wasCancelled()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.never())
.bufferWhen(Flux.just(1), u -> Mono.never()))
.thenAwait(Duration.ofMillis(100))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, List<String>>> scenarios_operatorSuccess() {
return Arrays.asList(scenario(f -> f.buffer(Mono.never()))
.receive(i -> assertThat(i).containsExactly(item(0), item(1), item(2)))
.shouldAssertPostTerminateState(false),
scenario(f -> f.buffer(Mono.just(1)))
.receiverEmpty()
.shouldAssertPostTerminateState(false)
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void neverTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.delaySubscription(Mono.never())
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnNextWhenNoBuffers() {
StepVerifier.create(Flux.just(1, 2, 3)
//buffer don't open in time
.bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.never())
.buffer(4))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.buffer(Mono.never()))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.never())
.bufferUntil(i -> i > 10))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancelOverlap() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6)
.limitRequest(2)
.concatWith(Mono.never())
.buffer(4, 2))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void timeoutDurationMessageDefault() {
StepVerifier.withVirtualTime(() -> Mono.never()
.timeout(Duration.ofHours(1)))
.thenAwait(Duration.ofHours(2))
.expectErrorMessage("Did not observe any item or terminal signal within " +
"3600000ms in 'source(MonoNever)' (and no fallback has been " +
"configured)")
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void assertWasNotCancelledMono() {
PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
Disposable d = probe.mono().subscribe();
probe.assertWasNotCancelled();
d.dispose();
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(probe::assertWasNotCancelled)
.withMessage("PublisherProbe should not have been cancelled but it was");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void assertWasCancelledMono() {
PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
Disposable d = probe.mono().subscribe();
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(probe::assertWasCancelled)
.withMessage("PublisherProbe should have been cancelled but it wasn't");
d.dispose();
probe.assertWasCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void switchOnNextDynamicallyOnNext() {
UnicastProcessor<Flux<Integer>> up = UnicastProcessor.create();
up.onNext(Flux.range(1, 3));
up.onNext(Flux.range(2, 3).concatWith(Mono.never()));
up.onNext(Flux.range(4, 3));
up.onComplete();
StepVerifier.create(Flux.switchOnNext(up))
.expectNext(1, 2, 3, 2, 3, 4, 4, 5, 6)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!