有条件指数回退的Spring无功重试

mpgws1up  于 2022-11-21  发布在  Spring
关注(0)|答案(5)|浏览(126)

使用spring reactive WebClient,我使用了一个API,如果响应状态为500,我需要使用指数回退重试。但在Mono类中,我没有看到任何使用Predicate作为输入参数的retryBackoff。
这就是我要找的功能:

public final Mono<T> retryBackoff(Predicate<? super Throwable> retryMatcher, long numRetries, Duration firstBackoff)

现在,我的实现如下(我没有带回退机制的重试):

client.sendRequest()
    .retry(e -> ((RestClientException) e).getStatus() == 500)
    .subscribe();
46scxncf

46scxncf1#

您可能需要查看reactor-addons项目中的reactor-extra模块。在Maven中,您可以执行以下操作:

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.2.3.RELEASE</version>
</dependency>

然后像这样使用它:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryWhen(Retry.onlyIf(ctx -> ctx.exception() instanceof RestClientException)
                    .exponentialBackoff(firstBackoff, maxBackoff)
                    .retryMax(maxRetries))
vh0rcniy

vh0rcniy2#

Retry.onlyIf现在已过时/删除。
如果有人对最新的解决方案感兴趣:

client.post()
      .syncBody("test")
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(maxRetries, minBackoff).filter(ctx -> {
          return ctx.exception() instanceof RestClientException && ctx.exception().statusCode == 500; 
      }))

值得一提的是,retryWhen将源异常 Package 到RetryExhaustedException中。如果您想“恢复”源异常,可以使用reactor.core.Exceptions实用程序:

.onErrorResume(throwable -> {
    if (Exceptions.isRetryExhausted(throwable)) {
        throwable = throwable.getCause();
    }
    return Mono.error(throwable);
})
mzsu5hc0

mzsu5hc03#

我不确定,您使用的是哪种 Spring 版本,在2.1.4中我有这样的:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryBackoff(numretries, firstBackoff, maxBackoff, jitterFactor);

......所以这就是你想要的,对吗?

flmtquvp

flmtquvp4#

我目前正在尝试使用Kotlin协同程序+ Spring WebFlux:
以下内容似乎不起作用:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()
u7up0aaq

u7up0aaq5#

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
        .doOnError(e -> { 
            errorCount.incrementAndGet();
            System.out.println(e + " at " + LocalTime.now());
        })
        .retryWhen(Retry
                .backoff(3, Duration.ofMillis(100)).jitter(0d) 
                .doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) 
                .onRetryExhaustedThrow((spec, rs) -> rs.failure()) 
        );

我们将记录源发出错误的时间并对它们进行计数。
我们将指数回退重试配置为最多尝试3次且无抖动。
我们还记录重试发生的时间和重试次数(从0开始)。
默认情况下,会抛出Exceptions.retryExhausted异常,最后一次失败()作为原因。这里我们将其定制为直接将原因作为onError发出。

相关问题