reactor.core.publisher.Mono.takeUntilOther()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(204)

本文整理了Java中reactor.core.publisher.Mono.takeUntilOther()方法的一些代码示例,展示了Mono.takeUntilOther()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.takeUntilOther()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:takeUntilOther

Mono.takeUntilOther介绍

[英]Give this Mono a chance to resolve before a companion Publisher emits. If the companion emits before any signal from the source, the resulting Mono will complete. Otherwise, it will relay signals from the source.
[中]在同伴发布之前,给这个Mono一个解决的机会。如果伴音在源信号发出之前发出,则产生的单声道将完成。否则,它将中继来自源的信号。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Give this Mono a chance to resolve within a specified time frame but complete if it
 * doesn't. This works a bit like {@link #timeout(Duration)} except that the resulting
 * {@link Mono} completes rather than errors when the timer expires.
 * <p>
 * <img class="marble" src="doc-files/marbles/takeWithTimespanForMono.svg" alt="">
 * <p>
 * The timeframe is evaluated using the provided {@link Scheduler}.
 *
 * @param duration the maximum duration to wait for the source Mono to resolve.
 * @param timer the {@link Scheduler} on which to measure the duration.
 *
 * @return a new {@link Mono} that will propagate the signals from the source unless
 * no signal is received for {@code duration}, in which case it completes.
 */
public final Mono<T> take(Duration duration, Scheduler timer) {
  return takeUntilOther(Mono.delay(duration, timer));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void apiTakeUntilOtherShortcircuits() {
  TestPublisher<String> other = TestPublisher.create();
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(200))
        .takeUntilOther(other)
  )
        .thenAwait(Duration.ofMillis(100))
        .then(() -> other.next("go"))
        .verifyComplete();
  other.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void apiTakeUntilOtherErrorBeforeOther() {
  TestPublisher<String> other = TestPublisher.create();
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(100))
        .then(Mono.error(new IllegalStateException("boom")))
        .takeUntilOther(other)
  )
        .thenAwait(Duration.ofMillis(200))
        .then(() -> other.next("go"))
        .verifyErrorMessage("boom");
  other.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void apiTakeUntilOtherCompleteBeforeOther() {
  TestPublisher<String> other = TestPublisher.create();
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(100))
        .ignoreElement()
        .takeUntilOther(other)
  )
        .thenAwait(Duration.ofMillis(200))
        .then(() -> other.next("go"))
        .verifyComplete();
  other.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void apiTakeUntilOtherValuedBeforeOther() {
  TestPublisher<String> other = TestPublisher.create();
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(100))
        .takeUntilOther(other)
  )
        .thenAwait(Duration.ofMillis(200))
        .then(() -> other.next("go"))
        .expectNext(0L)
        .verifyComplete();
  other.assertCancelled();
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Give this Mono a chance to resolve within a specified time frame but complete if it
 * doesn't. This works a bit like {@link #timeout(Duration)} except that the resulting
 * {@link Mono} completes rather than errors when the timer expires.
 * <p>
 * <img class="marble" src="doc-files/marbles/takeWithTimespanForMono.svg" alt="">
 * <p>
 * The timeframe is evaluated using the provided {@link Scheduler}.
 *
 * @param duration the maximum duration to wait for the source Mono to resolve.
 * @param timer the {@link Scheduler} on which to measure the duration.
 *
 * @return a new {@link Mono} that will propagate the signals from the source unless
 * no signal is received for {@code duration}, in which case it completes.
 */
public final Mono<T> take(Duration duration, Scheduler timer) {
  return takeUntilOther(Mono.delay(duration, timer));
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
 return rsocket
   .flatMap(
     rsocket ->
       rsocket
         .requestResponse(toPayload(message))
         .takeUntilOther(listenConnectionClose(rsocket)))
   .map(this::toMessage);
}

代码示例来源:origin: io.scalecube/rsocket-services-transport

@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
 return rsocket
   .flatMap(
     rsocket ->
       rsocket
         .requestResponse(toPayload(message))
         .takeUntilOther(listenConnectionClose(rsocket)))
   .map(this::toMessage);
}

代码示例来源:origin: io.scalecube/scalecube-services-transport-rsocket

@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
 return rsocket
   .flatMap(
     rsocket ->
       rsocket
         .requestResponse(toPayload(message))
         .takeUntilOther(listenConnectionClose(rsocket)))
   .map(this::toMessage);
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<ClientMessage> requestResponse(ClientMessage request) {
 return Mono.defer(
   () -> {
    Payload payload = toPayload(request);
    return getOrConnect()
      .flatMap(
        rsocket ->
          rsocket
            .requestResponse(payload) //
            .takeUntilOther(listenConnectionClose(rsocket)))
      .map(this::toMessage);
   });
}

相关文章

Mono类方法