本文整理了Java中reactor.core.publisher.Mono.takeUntilOther()
方法的一些代码示例,展示了Mono.takeUntilOther()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.takeUntilOther()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:takeUntilOther
[英]Give this Mono a chance to resolve before a companion Publisher emits. If the companion emits before any signal from the source, the resulting Mono will complete. Otherwise, it will relay signals from the source.
[中]在同伴发布之前,给这个Mono一个解决的机会。如果伴音在源信号发出之前发出,则产生的单声道将完成。否则,它将中继来自源的信号。
代码示例来源:origin: reactor/reactor-core
/**
* Give this Mono a chance to resolve within a specified time frame but complete if it
* doesn't. This works a bit like {@link #timeout(Duration)} except that the resulting
* {@link Mono} completes rather than errors when the timer expires.
* <p>
* <img class="marble" src="doc-files/marbles/takeWithTimespanForMono.svg" alt="">
* <p>
* The timeframe is evaluated using the provided {@link Scheduler}.
*
* @param duration the maximum duration to wait for the source Mono to resolve.
* @param timer the {@link Scheduler} on which to measure the duration.
*
* @return a new {@link Mono} that will propagate the signals from the source unless
* no signal is received for {@code duration}, in which case it completes.
*/
public final Mono<T> take(Duration duration, Scheduler timer) {
return takeUntilOther(Mono.delay(duration, timer));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiTakeUntilOtherShortcircuits() {
TestPublisher<String> other = TestPublisher.create();
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(200))
.takeUntilOther(other)
)
.thenAwait(Duration.ofMillis(100))
.then(() -> other.next("go"))
.verifyComplete();
other.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiTakeUntilOtherErrorBeforeOther() {
TestPublisher<String> other = TestPublisher.create();
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(100))
.then(Mono.error(new IllegalStateException("boom")))
.takeUntilOther(other)
)
.thenAwait(Duration.ofMillis(200))
.then(() -> other.next("go"))
.verifyErrorMessage("boom");
other.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiTakeUntilOtherCompleteBeforeOther() {
TestPublisher<String> other = TestPublisher.create();
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(100))
.ignoreElement()
.takeUntilOther(other)
)
.thenAwait(Duration.ofMillis(200))
.then(() -> other.next("go"))
.verifyComplete();
other.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiTakeUntilOtherValuedBeforeOther() {
TestPublisher<String> other = TestPublisher.create();
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(100))
.takeUntilOther(other)
)
.thenAwait(Duration.ofMillis(200))
.then(() -> other.next("go"))
.expectNext(0L)
.verifyComplete();
other.assertCancelled();
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Give this Mono a chance to resolve within a specified time frame but complete if it
* doesn't. This works a bit like {@link #timeout(Duration)} except that the resulting
* {@link Mono} completes rather than errors when the timer expires.
* <p>
* <img class="marble" src="doc-files/marbles/takeWithTimespanForMono.svg" alt="">
* <p>
* The timeframe is evaluated using the provided {@link Scheduler}.
*
* @param duration the maximum duration to wait for the source Mono to resolve.
* @param timer the {@link Scheduler} on which to measure the duration.
*
* @return a new {@link Mono} that will propagate the signals from the source unless
* no signal is received for {@code duration}, in which case it completes.
*/
public final Mono<T> take(Duration duration, Scheduler timer) {
return takeUntilOther(Mono.delay(duration, timer));
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
return rsocket
.flatMap(
rsocket ->
rsocket
.requestResponse(toPayload(message))
.takeUntilOther(listenConnectionClose(rsocket)))
.map(this::toMessage);
}
代码示例来源:origin: io.scalecube/rsocket-services-transport
@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
return rsocket
.flatMap(
rsocket ->
rsocket
.requestResponse(toPayload(message))
.takeUntilOther(listenConnectionClose(rsocket)))
.map(this::toMessage);
}
代码示例来源:origin: io.scalecube/scalecube-services-transport-rsocket
@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
return rsocket
.flatMap(
rsocket ->
rsocket
.requestResponse(toPayload(message))
.takeUntilOther(listenConnectionClose(rsocket)))
.map(this::toMessage);
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<ClientMessage> requestResponse(ClientMessage request) {
return Mono.defer(
() -> {
Payload payload = toPayload(request);
return getOrConnect()
.flatMap(
rsocket ->
rsocket
.requestResponse(payload) //
.takeUntilOther(listenConnectionClose(rsocket)))
.map(this::toMessage);
});
}
内容来源于网络,如有侵权,请联系作者删除!