[英]Re-subscribes to this Mono sequence if it signals any error, indefinitely.
代码示例来源:origin: reactor/reactor-core
* Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely.
* <p>
* <img class="marble" src="doc-files/marbles/retryForMono.svg" alt="">
* @return a {@link Mono} that retries on onError
public final Mono<T> retry() {
return retry(Long.MAX_VALUE);
代码示例来源:origin: reactor/reactor-core
* Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any
* error that match the given {@link Predicate}, otherwise push the error downstream.
* <p>
* <img class="marble" src="doc-files/marbles/retryWithAttemptsAndPredicateForMono.svg" alt="">
* @param numRetries the number of times to tolerate an error
* @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
* @return a {@link Mono} that retries on onError up to the specified number of retry
* attempts, only if the predicate matches.
public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher) {
return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries)));
代码示例来源:origin: codecentric/spring-boot-admin
public static ExchangeFilterFunction retry(int defaultRetries, Map<String, Integer> retriesPerEndpoint) {
return (request, next) -> {
int retries = 0;
if (!request.method().equals(HttpMethod.DELETE) &&
!request.method().equals(HttpMethod.PATCH) &&
!request.method().equals(HttpMethod.POST) &&
!request.method().equals(HttpMethod.PUT)) {
retries = request.attribute(ATTRIBUTE_ENDPOINT).map(retriesPerEndpoint::get).orElse(defaultRetries);
return next.exchange(request).retry(retries);
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void timesInvalid() {
代码示例来源:origin: reactor/reactor-core
public void lazilyEvaluatedSubscribe() {
AtomicInteger count = new AtomicInteger();
Mono<Object> error = Mono.error(() -> new IllegalStateException("boom" + count.incrementAndGet()));
assertThat(count).as("no op before subscribe").hasValue(0);
代码示例来源:origin: reactor/reactor-core
public void twoRetryErrorSupplier() {
AtomicInteger i = new AtomicInteger();
AtomicBoolean bool = new AtomicBoolean(true);
.doOnNext(v -> {
if(v < 4) {
if( v > 2){
throw new RuntimeException("test");
.retry(3, e -> bool.get()))
代码示例来源:origin: reactor/reactor-core
public void twoRetryNormalSupplier() {
AtomicInteger i = new AtomicInteger();
AtomicBoolean bool = new AtomicBoolean(true);
.doOnNext(v -> {
if(v < 4) {
throw new RuntimeException("test");
else {
.retry(3, e -> bool.get()))
代码示例来源:origin: reactor/reactor-core
public void doOnNextFails() {
.doOnNext(new Consumer<Integer>() {
int i;
public void accept(Integer t) {
if (i++ < 2) {
throw new RuntimeException("test");
代码示例来源:origin: reactor/reactor-core
public void retryInfinite() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger i = new AtomicInteger();
Mono.fromCallable(() -> {
int _i = i.getAndIncrement();
if (_i < 10) {
throw Exceptions.propagate(new RuntimeException("forced failure"));
return _i;
代码示例来源:origin: reactor/reactor-core
public void zeroRetryNoError() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
代码示例来源:origin: reactor/reactor-core
public void oneRetry() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger i = new AtomicInteger();
Mono.fromCallable(() -> {
int _i = i.getAndIncrement();
if (_i < 1) {
throw Exceptions.propagate(new RuntimeException("forced failure"));
return _i;
代码示例来源:origin: reactor/reactor-core
public void zeroRetry() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>error(new RuntimeException("forced failure")).retry(0)
.assertErrorMessage("forced failure");
代码示例来源:origin: io.projectreactor/reactor-core
* Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely.
* <p>
* <img class="marble" src="doc-files/marbles/retryForMono.svg" alt="">
* @return a {@link Mono} that retries on onError
public final Mono<T> retry() {
return retry(Long.MAX_VALUE);
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
* @return
* @see reactor.core.publisher.Mono#retry()
public final Mono<T> retry() {
return boxed.retry();
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
* @param numRetries
* @return
* @see reactor.core.publisher.Mono#retry(long)
public final Mono<T> retry(long numRetries) {
return boxed.retry(numRetries);
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
* @param retryMatcher
* @return
* @see reactor.core.publisher.Mono#retry(java.util.function.Predicate)
public final Mono<T> retry(Predicate<Throwable> retryMatcher) {
return boxed.retry(retryMatcher);
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
* @param numRetries
* @param retryMatcher
* @return
* @see reactor.core.publisher.Mono#retry(long, java.util.function.Predicate)
public final Mono<T> retry(long numRetries, Predicate<Throwable> retryMatcher) {
return boxed.retry(numRetries, retryMatcher);
代码示例来源:origin: io.projectreactor/reactor-core
* Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any
* error that match the given {@link Predicate}, otherwise push the error downstream.
* <p>
* <img class="marble" src="doc-files/marbles/retryWithAttemptsAndPredicateForMono.svg" alt="">
* @param numRetries the number of times to tolerate an error
* @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
* @return a {@link Mono} that retries on onError up to the specified number of retry
* attempts, only if the predicate matches.
public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher) {
return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries)));
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
public void subscribe(final CoreSubscriber<? super HttpClientResponse> subscriber) {
ReconnectableBridge bridge = new ReconnectableBridge();
bridge.activeURI = startURI;
Mono.defer(() -> parent.client.newHandler(new HttpClientHandler(this, bridge),
代码示例来源:origin: reactor/reactor-kafka
public void manualCommitRetry() throws Exception {
consumer.addCommitException(new RetriableCommitFailedException("coordinator failed"), 2);
int count = 10;
receiverOptions = receiverOptions
sendMessages(topic, 0, count + 10);
receiveAndVerify(10, record -> record.receiverOffset().commit().retry().then(Mono.just(record)));
verifyCommits(groupId, topic, 10);