reactor.core.publisher.Mono.retry()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(501)

本文整理了Java中reactor.core.publisher.Mono.retry()方法的一些代码示例,展示了Mono.retry()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.retry()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:retry

Mono.retry介绍

[英]Re-subscribes to this Mono sequence if it signals any error, indefinitely.
[中]如果出现任何错误,无限期地重新订阅该单声道序列。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely.
 * <p>
 * <img class="marble" src="doc-files/marbles/retryForMono.svg" alt="">
 *
 * @return a {@link Mono} that retries on onError
 */
public final Mono<T> retry() {
  return retry(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any
 * error that match the given {@link Predicate}, otherwise push the error downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/retryWithAttemptsAndPredicateForMono.svg" alt="">
 *
 * @param numRetries the number of times to tolerate an error
 * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
 *
 * @return a {@link Mono} that retries on onError up to the specified number of retry
 * attempts, only if the predicate matches.
 *
 */
public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher) {
  return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries)));
}

代码示例来源:origin: codecentric/spring-boot-admin

public static ExchangeFilterFunction retry(int defaultRetries, Map<String, Integer> retriesPerEndpoint) {
    return (request, next) -> {
      int retries = 0;
      if (!request.method().equals(HttpMethod.DELETE) &&
        !request.method().equals(HttpMethod.PATCH) &&
        !request.method().equals(HttpMethod.POST) &&
        !request.method().equals(HttpMethod.PUT)) {
        retries = request.attribute(ATTRIBUTE_ENDPOINT).map(retriesPerEndpoint::get).orElse(defaultRetries);
      }
      return next.exchange(request).retry(retries);
    };
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void timesInvalid() {
  Mono.never()
    .retry(-1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void lazilyEvaluatedSubscribe() {
  AtomicInteger count = new AtomicInteger();
  Mono<Object> error = Mono.error(() -> new IllegalStateException("boom" + count.incrementAndGet()));
  assertThat(count).as("no op before subscribe").hasValue(0);
  StepVerifier.create(error.retry(3))
        .verifyErrorMessage("boom4");
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void twoRetryErrorSupplier() {
    AtomicInteger i = new AtomicInteger();
    AtomicBoolean bool = new AtomicBoolean(true);

    StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
                .doOnNext(v -> {
                  if(v < 4) {
                    if( v > 2){
                      bool.set(false);
                    }
                    throw new RuntimeException("test");
                  }
                })
                .retry(3, e -> bool.get()))
          .verifyErrorMessage("test");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void twoRetryNormalSupplier() {
  AtomicInteger i = new AtomicInteger();
  AtomicBoolean bool = new AtomicBoolean(true);
  StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
              .doOnNext(v -> {
                if(v < 4) {
                  throw new RuntimeException("test");
                }
                else {
                  bool.set(false);
                }
              })
              .retry(3, e -> bool.get()))
        .expectNext(4)
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void doOnNextFails() {
    Mono.just(1)
      .doOnNext(new Consumer<Integer>() {
        int i;

        @Override
        public void accept(Integer t) {
          if (i++ < 2) {
            throw new RuntimeException("test");
          }
        }
      })
      .retry(2)
      .subscribeWith(AssertSubscriber.create())
      .assertValues(1);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void retryInfinite() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger i = new AtomicInteger();
  Mono.fromCallable(() -> {
    int _i = i.getAndIncrement();
    if (_i < 10) {
      throw Exceptions.propagate(new RuntimeException("forced failure"));
    }
    return _i;
  })
    .retry()
    .subscribe(ts);
  ts.assertValues(10)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void zeroRetryNoError() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .retry(0)
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void oneRetry() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger i = new AtomicInteger();
  Mono.fromCallable(() -> {
    int _i = i.getAndIncrement();
    if (_i < 1) {
      throw Exceptions.propagate(new RuntimeException("forced failure"));
    }
    return _i;
  })
    .retry(1)
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void zeroRetry() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>error(new RuntimeException("forced failure")).retry(0)
                                .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely.
 * <p>
 * <img class="marble" src="doc-files/marbles/retryForMono.svg" alt="">
 *
 * @return a {@link Mono} that retries on onError
 */
public final Mono<T> retry() {
  return retry(Long.MAX_VALUE);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Mono#retry()
 */
public final Mono<T> retry() {
  return boxed.retry();
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param numRetries
 * @return
 * @see reactor.core.publisher.Mono#retry(long)
 */
public final Mono<T> retry(long numRetries) {
  return boxed.retry(numRetries);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param retryMatcher
 * @return
 * @see reactor.core.publisher.Mono#retry(java.util.function.Predicate)
 */
public final Mono<T> retry(Predicate<Throwable> retryMatcher) {
  return boxed.retry(retryMatcher);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param numRetries
 * @param retryMatcher
 * @return
 * @see reactor.core.publisher.Mono#retry(long, java.util.function.Predicate)
 */
public final Mono<T> retry(long numRetries, Predicate<Throwable> retryMatcher) {
  return boxed.retry(numRetries, retryMatcher);
}
/**

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any
 * error that match the given {@link Predicate}, otherwise push the error downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/retryWithAttemptsAndPredicateForMono.svg" alt="">
 *
 * @param numRetries the number of times to tolerate an error
 * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
 *
 * @return a {@link Mono} that retries on onError up to the specified number of retry
 * attempts, only if the predicate matches.
 *
 */
public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher) {
  return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries)));
}

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

@Override
@SuppressWarnings("unchecked")
public void subscribe(final CoreSubscriber<? super HttpClientResponse> subscriber) {
  ReconnectableBridge bridge = new ReconnectableBridge();
  bridge.activeURI = startURI;
  Mono.defer(() -> parent.client.newHandler(new HttpClientHandler(this, bridge),
      parent.options.getRemoteAddress(bridge.activeURI),
      HttpClientOptions.isSecure(bridge.activeURI),
      bridge))
    .retry(bridge)
    .cast(HttpClientResponse.class)
    .subscribe(subscriber);
}

代码示例来源:origin: reactor/reactor-kafka

@Test
public void manualCommitRetry() throws Exception {
  consumer.addCommitException(new RetriableCommitFailedException("coordinator failed"), 2);
  int count = 10;
  receiverOptions = receiverOptions
      .commitBatchSize(0)
      .commitInterval(Duration.ofMillis(Long.MAX_VALUE))
      .maxCommitAttempts(1)
      .subscription(Collections.singletonList(topic));
  sendMessages(topic, 0, count + 10);
  receiveAndVerify(10, record -> record.receiverOffset().commit().retry().then(Mono.just(record)));
  verifyCommits(groupId, topic, 10);
}

相关文章

Mono类方法