本文整理了Java中reactor.core.publisher.Mono.retryBackoff()
方法的一些代码示例,展示了Mono.retryBackoff()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.retryBackoff()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:retryBackoff
[英]In case of error, retry this Mono up to numRetries times using a randomized exponential backoff strategy (jitter). The jitter factor is 50%but the effective backoff delay cannot be less than firstBackoff.
The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:
代码示例来源:origin: reactor/reactor-core
return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);
代码示例来源:origin: reactor/reactor-core
return retryBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d);
代码示例来源:origin: reactor/reactor-core
@Test
public void monoRetryRandomBackoff_noRandom() {
AtomicInteger errorCount = new AtomicInteger();
Exception exception = new IOException("boom retry");
List<Long> elapsedList = new ArrayList<>();
StepVerifier.withVirtualTime(() ->
Mono.error(exception)
.doOnError(e -> {
errorCount.incrementAndGet();
elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0d)
)
.thenAwait(Duration.ofMinutes(1)) //ensure whatever the jittered delay that we have time to fit 4 retries
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Retries exhausted: 4/4")
.hasCause(exception))
.verify(Duration.ofSeconds(1)); //vts test shouldn't even take that long
assertThat(errorCount).hasValue(5);
assertThat(elapsedList.get(0)).isEqualTo(0L);
assertThat(elapsedList.get(1) - elapsedList.get(0)).isEqualTo(100);
assertThat(elapsedList.get(2) - elapsedList.get(1)).isEqualTo(200);
assertThat(elapsedList.get(3) - elapsedList.get(2)).isEqualTo(400);
assertThat(elapsedList.get(4) - elapsedList.get(3)).isEqualTo(800);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoRetryRandomBackoff_minBackoffFloor() {
for (int i = 0; i < 50; i++) {
AtomicInteger errorCount = new AtomicInteger();
Exception exception = new IOException("boom retry loop #" + i);
List<Long> elapsedList = new ArrayList<>();
StepVerifier.withVirtualTime(() ->
Mono.error(exception)
.doOnError(e -> {
errorCount.incrementAndGet();
elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(1, Duration.ofMillis(100), Duration.ofMillis(2000), 0.9)
)
.thenAwait(Duration.ofMinutes(1)) //ensure whatever the jittered delay that we have time to fit 4 retries
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Retries exhausted: 1/1")
.hasCause(exception))
.verify(Duration.ofSeconds(1)); //vts test shouldn't even take that long
assertThat(errorCount).hasValue(2);
assertThat(elapsedList).hasSize(2);
assertThat(elapsedList.get(0)).isEqualTo(0L);
assertThat(elapsedList.get(1) - elapsedList.get(0))
.isGreaterThanOrEqualTo(100) //min backoff
.isCloseTo(100, Percentage.withPercentage(90));
}
}
代码示例来源:origin: reactor/reactor-core
elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(220), 0.9)
代码示例来源:origin: reactor/reactor-core
elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100))
代码示例来源:origin: reactor/reactor-core
elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0.1)
代码示例来源:origin: reactor/reactor-core
elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000))
代码示例来源:origin: rsocket/rsocket-java
.retryBackoff(weightedSocketRetries, weightedSocketBackOff, weightedSocketMaxBackOff)
.doOnError(
throwable -> {
代码示例来源:origin: io.projectreactor/reactor-core
return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);
代码示例来源:origin: io.projectreactor/reactor-core
return retryBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d);
代码示例来源:origin: apache/james-project
@Override
public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
return executionResult
.retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
.doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
mailboxListener.getClass().getCanonicalName(),
retryBackoff.getMaxRetries(),
event.getClass().getCanonicalName(),
throwable))
.then();
}
}
代码示例来源:origin: reactor/reactor-kafka
private void sendMessagesSync(int startIndex, int count) throws Exception {
CountDownLatch latch = new CountDownLatch(count);
Flux.range(startIndex, count)
.map(i -> createProducerRecord(i, true))
.concatMap(record -> kafkaSender.createOutbound().send(Mono.just(record)).then()
.doOnSuccess(metadata -> latch.countDown())
.retryBackoff(100, Duration.ofMillis(100)))
.subscribe();
assertTrue("Messages not sent ", latch.await(receiveTimeoutMillis, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: io.rsocket/rsocket-load-balancer
.retryBackoff(weightedSocketRetries, weightedSocketBackOff, weightedSocketMaxBackOff)
.doOnError(
throwable -> {
内容来源于网络,如有侵权,请联系作者删除!