reactor.core.publisher.Mono.repeat()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(349)

本文整理了Java中reactor.core.publisher.Mono.repeat()方法的一些代码示例,展示了Mono.repeat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.repeat()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:repeat

Mono.repeat介绍

[英]Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.
[中]在完成前一次订阅后,重复无限期订阅源。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Repeatedly and indefinitely subscribe to the source upon completion of the
 * previous subscription.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/repeatForMono.svg" alt="">
 *
 * @return an indefinitely repeated {@link Flux} on onComplete
 */
public final Flux<T> repeat() {
  return repeat(Flux.ALWAYS_BOOLEAN_SUPPLIER);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Repeatedly subscribe to the source if the predicate returns true after completion of the previous
 * subscription. A specified maximum of repeat will limit the number of re-subscribe.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/repeatWithAttemptsAndPredicateForMono.svg" alt="">
 *
 * @param numRepeat the number of times to re-subscribe on complete (positive, or 0 for original sequence only)
 * @param predicate the boolean to evaluate on onComplete
 *
 * @return a {@link Flux} that repeats on onComplete while the predicate matches,
 * up to the specified number of repetitions
 */
public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate) {
  if (numRepeat < 0L) {
    throw new IllegalArgumentException("numRepeat >= 0 required");
  }
  if (numRepeat == 0) {
    return this.flux();
  }
  return Flux.defer(() -> repeat(Flux.countingBooleanSupplier(predicate, numRepeat)));
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void timesInvalid() {
  Mono.never()
    .repeat(-1);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void predicateNull() {
  Mono.never()
    .repeat(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nZero() {
  StepVerifier.create(Mono.just(3)
              .repeat(0, () -> true))
        .expectNext(3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nMinusOne() {
  Mono<Integer> source = Mono.just(3);
  assertThatIllegalArgumentException()
       .isThrownBy(() -> source.repeat(-1, () -> true))
       .withMessage("numRepeat >= 0 required");
}

代码示例来源:origin: reactor/reactor-core

@Override
public Publisher<Integer> createPublisher(long elements) {
  if (elements <= Integer.MAX_VALUE) {
    return Flux.range(1, (int) elements)
          .filter(integer -> true)
          .map(integer -> integer)
          .transform(this::transformFlux);
  }
  else {
    final Random random = new Random();
    return Mono.fromCallable(random::nextInt)
          .repeat()
          .map(Math::abs)
          .transform(this::transformFlux);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nOne() {
  StepVerifier.create(Mono.just(3)
              .repeat(1, () -> true))
        .expectNext(3)
        .expectNext(3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void twoRepeatNormalSupplier() {
  AtomicInteger i = new AtomicInteger();
  AtomicBoolean bool = new AtomicBoolean(true);
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(2, bool::get))
        .expectNext(1, 2)
        .expectNext(3)
        .then(() -> bool.set(false))
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void zeroRepeat() {
  AtomicInteger i = new AtomicInteger();
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(0))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void oneRepeat() {
  AtomicInteger i = new AtomicInteger();
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(1))
        .expectNext(1, 2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void twoRepeat() {
  AtomicInteger i = new AtomicInteger();
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(2))
        .expectNext(1, 2, 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nTwo() {
  StepVerifier.create(Mono.just(3)
              .repeat(2, () -> true))
        .expectNext(3)
        .expectNext(3)
        .expectNext(3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void oneRepeatBackpressured() {
  AtomicInteger i = new AtomicInteger();
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(1), 0)
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(100))
        .thenRequest(3)
        .expectNext(1, 2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void twoRepeatNormal() {
  AtomicInteger i = new AtomicInteger();
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(2)
              .count())
        .expectNext(3L)
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void twoRepeatBackpressured() {
  AtomicInteger i = new AtomicInteger();
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .repeat(2), 0)
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(100))
        .thenRequest(2)
        .expectNext(1, 2)
        .thenRequest(3)
        .expectNext(3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void repeatInfinite() {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();

    AtomicInteger i = new AtomicInteger();
    Mono.fromCallable(i::incrementAndGet)
      .repeat()
      .take(9)
      .subscribe(ts);

    ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9)
     .assertComplete()
     .assertNoError();
  }
}

代码示例来源:origin: reactor/reactor-core

.zipWith(Mono.fromCallable(System::currentTimeMillis).repeat(), (t1, t2) ->
String.format("%s : %s", t1, t2));

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Repeatedly and indefinitely subscribe to the source upon completion of the
 * previous subscription.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/repeatForMono.svg" alt="">
 *
 * @return an indefinitely repeated {@link Flux} on onComplete
 */
public final Flux<T> repeat() {
  return repeat(Flux.ALWAYS_BOOLEAN_SUPPLIER);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param numRepeat
 * @param predicate
 * @return
 * @see reactor.core.publisher.Mono#repeat(long, java.util.function.BooleanSupplier)
 */
public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate) {
  return boxed.repeat(numRepeat, predicate);
}
/**

相关文章

Mono类方法