本文整理了Java中reactor.core.publisher.Mono.retry()
方法的一些代码示例,展示了Mono.retry()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.retry()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:retry
[英]Re-subscribes to this Mono sequence if it signals any error, indefinitely.
[中]如果出现任何错误,无限期地重新订阅该单声道序列。
代码示例来源:origin: reactor/reactor-core
/**
* Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely.
* <p>
* <img class="marble" src="doc-files/marbles/retryForMono.svg" alt="">
*
* @return a {@link Mono} that retries on onError
*/
public final Mono<T> retry() {
return retry(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
/**
* Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any
* error that match the given {@link Predicate}, otherwise push the error downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/retryWithAttemptsAndPredicateForMono.svg" alt="">
*
* @param numRetries the number of times to tolerate an error
* @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
*
* @return a {@link Mono} that retries on onError up to the specified number of retry
* attempts, only if the predicate matches.
*
*/
public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher) {
return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries)));
}
代码示例来源:origin: codecentric/spring-boot-admin
public static ExchangeFilterFunction retry(int defaultRetries, Map<String, Integer> retriesPerEndpoint) {
return (request, next) -> {
int retries = 0;
if (!request.method().equals(HttpMethod.DELETE) &&
!request.method().equals(HttpMethod.PATCH) &&
!request.method().equals(HttpMethod.POST) &&
!request.method().equals(HttpMethod.PUT)) {
retries = request.attribute(ATTRIBUTE_ENDPOINT).map(retriesPerEndpoint::get).orElse(defaultRetries);
}
return next.exchange(request).retry(retries);
};
}
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void timesInvalid() {
Mono.never()
.retry(-1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void lazilyEvaluatedSubscribe() {
AtomicInteger count = new AtomicInteger();
Mono<Object> error = Mono.error(() -> new IllegalStateException("boom" + count.incrementAndGet()));
assertThat(count).as("no op before subscribe").hasValue(0);
StepVerifier.create(error.retry(3))
.verifyErrorMessage("boom4");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void twoRetryErrorSupplier() {
AtomicInteger i = new AtomicInteger();
AtomicBoolean bool = new AtomicBoolean(true);
StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
.doOnNext(v -> {
if(v < 4) {
if( v > 2){
bool.set(false);
}
throw new RuntimeException("test");
}
})
.retry(3, e -> bool.get()))
.verifyErrorMessage("test");
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void twoRetryNormalSupplier() {
AtomicInteger i = new AtomicInteger();
AtomicBoolean bool = new AtomicBoolean(true);
StepVerifier.create(Mono.fromCallable(i::incrementAndGet)
.doOnNext(v -> {
if(v < 4) {
throw new RuntimeException("test");
}
else {
bool.set(false);
}
})
.retry(3, e -> bool.get()))
.expectNext(4)
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void doOnNextFails() {
Mono.just(1)
.doOnNext(new Consumer<Integer>() {
int i;
@Override
public void accept(Integer t) {
if (i++ < 2) {
throw new RuntimeException("test");
}
}
})
.retry(2)
.subscribeWith(AssertSubscriber.create())
.assertValues(1);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void retryInfinite() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger i = new AtomicInteger();
Mono.fromCallable(() -> {
int _i = i.getAndIncrement();
if (_i < 10) {
throw Exceptions.propagate(new RuntimeException("forced failure"));
}
return _i;
})
.retry()
.subscribe(ts);
ts.assertValues(10)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void zeroRetryNoError() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.retry(0)
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void oneRetry() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger i = new AtomicInteger();
Mono.fromCallable(() -> {
int _i = i.getAndIncrement();
if (_i < 1) {
throw Exceptions.propagate(new RuntimeException("forced failure"));
}
return _i;
})
.retry(1)
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void zeroRetry() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>error(new RuntimeException("forced failure")).retry(0)
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely.
* <p>
* <img class="marble" src="doc-files/marbles/retryForMono.svg" alt="">
*
* @return a {@link Mono} that retries on onError
*/
public final Mono<T> retry() {
return retry(Long.MAX_VALUE);
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Mono#retry()
*/
public final Mono<T> retry() {
return boxed.retry();
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param numRetries
* @return
* @see reactor.core.publisher.Mono#retry(long)
*/
public final Mono<T> retry(long numRetries) {
return boxed.retry(numRetries);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param retryMatcher
* @return
* @see reactor.core.publisher.Mono#retry(java.util.function.Predicate)
*/
public final Mono<T> retry(Predicate<Throwable> retryMatcher) {
return boxed.retry(retryMatcher);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param numRetries
* @param retryMatcher
* @return
* @see reactor.core.publisher.Mono#retry(long, java.util.function.Predicate)
*/
public final Mono<T> retry(long numRetries, Predicate<Throwable> retryMatcher) {
return boxed.retry(numRetries, retryMatcher);
}
/**
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any
* error that match the given {@link Predicate}, otherwise push the error downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/retryWithAttemptsAndPredicateForMono.svg" alt="">
*
* @param numRetries the number of times to tolerate an error
* @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
*
* @return a {@link Mono} that retries on onError up to the specified number of retry
* attempts, only if the predicate matches.
*
*/
public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher) {
return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries)));
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
@Override
@SuppressWarnings("unchecked")
public void subscribe(final CoreSubscriber<? super HttpClientResponse> subscriber) {
ReconnectableBridge bridge = new ReconnectableBridge();
bridge.activeURI = startURI;
Mono.defer(() -> parent.client.newHandler(new HttpClientHandler(this, bridge),
parent.options.getRemoteAddress(bridge.activeURI),
HttpClientOptions.isSecure(bridge.activeURI),
bridge))
.retry(bridge)
.cast(HttpClientResponse.class)
.subscribe(subscriber);
}
代码示例来源:origin: reactor/reactor-kafka
@Test
public void manualCommitRetry() throws Exception {
consumer.addCommitException(new RetriableCommitFailedException("coordinator failed"), 2);
int count = 10;
receiverOptions = receiverOptions
.commitBatchSize(0)
.commitInterval(Duration.ofMillis(Long.MAX_VALUE))
.maxCommitAttempts(1)
.subscription(Collections.singletonList(topic));
sendMessages(topic, 0, count + 10);
receiveAndVerify(10, record -> record.receiverOffset().commit().retry().then(Mono.just(record)));
verifyCommits(groupId, topic, 10);
}
内容来源于网络,如有侵权,请联系作者删除!