本文整理了Java中reactor.core.publisher.Mono.delaySubscription()
方法的一些代码示例,展示了Mono.delaySubscription()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.delaySubscription()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:delaySubscription
[英]Delay the Mono#subscribe(Subscriber) to this Mono source until the given period elapses.
[中]延迟Mono#订阅(订户)到此Mono源,直到给定的时间段过去。
代码示例来源:origin: reactor/reactor-core
/**
* Delay the {@link Mono#subscribe(Subscriber) subscription} to this {@link Mono} source until the given
* period elapses.
*
* <p>
* <img class="marble" src="doc-files/marbles/delaySubscriptionForMono.svg" alt="">
*
* @param delay duration before subscribing this {@link Mono}
*
* @return a delayed {@link Mono}
*
*/
public final Mono<T> delaySubscription(Duration delay) {
return delaySubscription(delay, Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
/**
* Delay the {@link Mono#subscribe(Subscriber) subscription} to this {@link Mono} source until the given
* {@link Duration} elapses.
*
* <p>
* <img class="marble" src="doc-files/marbles/delaySubscriptionForMono.svg" alt="">
*
* @param delay {@link Duration} before subscribing this {@link Mono}
* @param timer a time-capable {@link Scheduler} instance to run on
*
* @return a delayed {@link Mono}
*
*/
public final Mono<T> delaySubscription(Duration delay, Scheduler timer) {
return delaySubscription(Mono.delay(delay, timer));
}
代码示例来源:origin: reactor/reactor-core
Mono<Integer> scenario_delayedTrigger(){
return Mono.just(1)
.delaySubscription(Duration.ofSeconds(3));
}
代码示例来源:origin: reactor/reactor-core
Mono<Integer> scenario_delayedTrigger2(){
return Mono.just(1)
.delaySubscription(Duration.ofMillis(50));
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void otherNull() {
Mono.never().delaySubscription((Publisher<?>)null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptySources() {
AtomicBoolean cancelled = new AtomicBoolean();
Mono<String> empty1 = Mono.empty();
Mono<String> empty2 = Mono.empty();
Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
.doOnCancel(() -> cancelled.set(true));
Duration d = StepVerifier.create(Mono.zip(empty1, empty2, empty3))
.verifyComplete();
assertThat(cancelled).isTrue();
assertThat(d).isLessThan(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void delayErrorEmptySources() {
AtomicBoolean cancelled = new AtomicBoolean();
Mono<String> empty1 = Mono.empty();
Mono<String> empty2 = Mono.empty();
Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
.doOnCancel(() -> cancelled.set(true));
StepVerifier.create(Mono.zipDelayError(empty1, empty2, empty3))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(400))
.verifyComplete();
assertThat(cancelled).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.delaySubscription(Mono.just(1))
.subscribe(ts);
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyTrigger() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.delaySubscription(Mono.empty())
.subscribe(ts);
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void manyTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.delaySubscription(Flux.range(1, 10))
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void neverTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.delaySubscription(Mono.never())
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyTriggerBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Mono.just(1)
.delaySubscription(Mono.empty())
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Mono.just(1)
.delaySubscription(Mono.just(1))
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeNoEvent() {
StepVerifier.withVirtualTime(() -> Mono.just("foo")
.delaySubscription(Duration.ofDays(2)))
.expectSubscription()
.expectNoEvent(Duration.ofDays(2))
.expectNext("foo")
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void manyTriggeredBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Mono.just(1)
.delaySubscription(Flux.range(1, 10))
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeNoEventError() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.just("foo")
.delaySubscription(Duration.ofDays(2)))
.expectSubscription()
.expectNoEvent(Duration.ofDays(2))
.expectNext("foo")
.expectNoEvent(Duration.ofDays(2))
.expectComplete()
.verify())
.withMessage("Unexpected completion during a no-event expectation");
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param subscriptionDelay
* @return
* @see reactor.core.publisher.Mono#delaySubscription(org.reactivestreams.Publisher)
*/
public final <U> Mono<T> delaySubscription(Publisher<U> subscriptionDelay) {
return boxed.delaySubscription(subscriptionDelay);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param delay
* @return
* @see reactor.core.publisher.Mono#delaySubscription(java.time.Duration)
*/
public final Mono<T> delaySubscription(Duration delay) {
return boxed.delaySubscription(delay);
}
/**
代码示例来源:origin: rsocket/rsocket-java
@Test
void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
TestDuplexConnection conn = new TestDuplexConnection();
RSocketClient rSocket =
new RSocketClient(
conn, DefaultPayload::create, err -> {}, StreamIdSupplier.clientSupplier());
conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException("error")));
StepVerifier.create(
rSocket
.requestResponse(DefaultPayload.create("test"))
.delaySubscription(Duration.ofMillis(100)))
.expectErrorMatches(
err -> err instanceof RejectedSetupException && "error".equals(err.getMessage()))
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Delay the {@link Mono#subscribe(Subscriber) subscription} to this {@link Mono} source until the given
* period elapses.
*
* <p>
* <img class="marble" src="doc-files/marbles/delaySubscriptionForMono.svg" alt="">
*
* @param delay duration before subscribing this {@link Mono}
*
* @return a delayed {@link Mono}
*
*/
public final Mono<T> delaySubscription(Duration delay) {
return delaySubscription(delay, Schedulers.parallel());
}
内容来源于网络,如有侵权,请联系作者删除!