reactor.core.publisher.Mono.retryBackoff()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(265)

本文整理了Java中reactor.core.publisher.Mono.retryBackoff()方法的一些代码示例,展示了Mono.retryBackoff()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.retryBackoff()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:retryBackoff

Mono.retryBackoff介绍

[英]In case of error, retry this Mono up to numRetries times using a randomized exponential backoff strategy (jitter). The jitter factor is 50%but the effective backoff delay cannot be less than firstBackoff.

The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:

  • having an exponentially growing backoff delay with a small initial delay gives the best tradeoff between not overwhelming the server and serving the client as fast as possible
  • having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" where eg. numerous clients would hit the server at the same time, causing it to display transient failures which would cause all clients to retry at the same backoff times, ultimately sparing no load on the server.
    [中]如果出现错误,请使用随机指数退避策略(抖动)重试此Mono最多numRetries次。抖动系数为50%,但有效退避延迟不能小于firstBackoff。
    随机指数退避有助于防止其他更简单退避策略的两个典型问题,即:
    *在初始延迟很小的情况下,采用指数增长的退避延迟,可以在不让服务器负担过重和尽可能快地为客户机提供服务之间实现最佳折衷
    *具有抖动或随机退避延迟有助于避免“重试风暴”,例如,许多客户端会同时攻击服务器,导致服务器显示瞬时故障,这将导致所有客户端在相同的退避时间重试,最终不会在服务器上节省任何负载。

代码示例

代码示例来源:origin: reactor/reactor-core

return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);

代码示例来源:origin: reactor/reactor-core

return retryBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d);

代码示例来源:origin: reactor/reactor-core

@Test
public void monoRetryRandomBackoff_noRandom() {
  AtomicInteger errorCount = new AtomicInteger();
  Exception exception = new IOException("boom retry");
  List<Long> elapsedList = new ArrayList<>();
  StepVerifier.withVirtualTime(() ->
      Mono.error(exception)
        .doOnError(e -> {
          errorCount.incrementAndGet();
          elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
        })
        .retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0d)
  )
        .thenAwait(Duration.ofMinutes(1)) //ensure whatever the jittered delay that we have time to fit 4 retries
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Retries exhausted: 4/4")
                            .hasCause(exception))
        .verify(Duration.ofSeconds(1)); //vts test shouldn't even take that long
  assertThat(errorCount).hasValue(5);
  assertThat(elapsedList.get(0)).isEqualTo(0L);
  assertThat(elapsedList.get(1) - elapsedList.get(0)).isEqualTo(100);
  assertThat(elapsedList.get(2) - elapsedList.get(1)).isEqualTo(200);
  assertThat(elapsedList.get(3) - elapsedList.get(2)).isEqualTo(400);
  assertThat(elapsedList.get(4) - elapsedList.get(3)).isEqualTo(800);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoRetryRandomBackoff_minBackoffFloor() {
  for (int i = 0; i < 50; i++) {
    AtomicInteger errorCount = new AtomicInteger();
    Exception exception = new IOException("boom retry loop #" + i);
    List<Long> elapsedList = new ArrayList<>();
    StepVerifier.withVirtualTime(() ->
        Mono.error(exception)
          .doOnError(e -> {
            errorCount.incrementAndGet();
            elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
          })
          .retryBackoff(1, Duration.ofMillis(100), Duration.ofMillis(2000), 0.9)
    )
          .thenAwait(Duration.ofMinutes(1)) //ensure whatever the jittered delay that we have time to fit 4 retries
          .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                              .hasMessage("Retries exhausted: 1/1")
                              .hasCause(exception))
          .verify(Duration.ofSeconds(1)); //vts test shouldn't even take that long
    assertThat(errorCount).hasValue(2);
    assertThat(elapsedList).hasSize(2);
    assertThat(elapsedList.get(0)).isEqualTo(0L);
    assertThat(elapsedList.get(1) - elapsedList.get(0))
        .isGreaterThanOrEqualTo(100) //min backoff
        .isCloseTo(100, Percentage.withPercentage(90));
  }
}

代码示例来源:origin: reactor/reactor-core

elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(220), 0.9)

代码示例来源:origin: reactor/reactor-core

elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100))

代码示例来源:origin: reactor/reactor-core

elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0.1)

代码示例来源:origin: reactor/reactor-core

elapsedList.add(Schedulers.parallel().now(TimeUnit.MILLISECONDS));
})
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000))

代码示例来源:origin: rsocket/rsocket-java

.retryBackoff(weightedSocketRetries, weightedSocketBackOff, weightedSocketMaxBackOff)
.doOnError(
  throwable -> {

代码示例来源:origin: io.projectreactor/reactor-core

return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);

代码示例来源:origin: io.projectreactor/reactor-core

return retryBackoff(numRetries, firstBackoff, Duration.ofMillis(Long.MAX_VALUE), 0.5d);

代码示例来源:origin: apache/james-project

@Override
  public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
    return executionResult
      .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
      .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
        mailboxListener.getClass().getCanonicalName(),
        retryBackoff.getMaxRetries(),
        event.getClass().getCanonicalName(),
        throwable))
      .then();
  }
}

代码示例来源:origin: reactor/reactor-kafka

private void sendMessagesSync(int startIndex, int count) throws Exception {
  CountDownLatch latch = new CountDownLatch(count);
  Flux.range(startIndex, count)
    .map(i -> createProducerRecord(i, true))
    .concatMap(record -> kafkaSender.createOutbound().send(Mono.just(record)).then()
                    .doOnSuccess(metadata -> latch.countDown())
                    .retryBackoff(100, Duration.ofMillis(100)))
    .subscribe();
  assertTrue("Messages not sent ", latch.await(receiveTimeoutMillis, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: io.rsocket/rsocket-load-balancer

.retryBackoff(weightedSocketRetries, weightedSocketBackOff, weightedSocketMaxBackOff)
.doOnError(
  throwable -> {

相关文章

Mono类方法