[英]Retries this Mono when a companion sequence signals an item in response to this Mono error signal

If the companion sequence signals when the Mono is active, the retry attempt is suppressed and any terminal signal will terminate the Mono source with the same signal immediately.

Note that if the companion Publisher created by the whenFactoryemits Context as trigger objects, the content of these Context will be added to the operator's own Context.


public Mono<Instance> computeIfPresent(InstanceId id,
                    BiFunction<InstanceId, Instance, Mono<Instance>> remappingFunction) {
  return this.find(id)
        .flatMap(application -> remappingFunction.apply(id, application))

public Mono<Instance> compute(InstanceId id, BiFunction<InstanceId, Instance, Mono<Instance>> remappingFunction) {
  return this.find(id)
        .flatMap(application -> remappingFunction.apply(id, application))
        .switchIfEmpty(Mono.defer(() -> remappingFunction.apply(id, null)))

return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor));

public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
      .handle(new ReactorNettyHandler(handler))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
  return new MonoToListenableFutureAdapter<>(connectMono);

Mono<String> exponentialRetryScenario() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
    else {
      s.error(new RuntimeException("test " + i));
  }).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));

public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
  return (exchange, chain) -> {
    trace("Entering retry-filter");
    // chain.filter returns a Mono<Void>
    Publisher<Void> publisher = chain.filter(exchange)
        //.log("retry-filter", Level.INFO)
        .doOnSuccessOrError((aVoid, throwable) -> {
          int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
          int newIteration = iteration + 1;
          trace("setting new iteration in attr %d", newIteration);
          exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
    if (retry != null) {
      // retryWhen returns a Mono<Void>
      // retry needs to go before repeat
      publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
    if (repeat != null) {
      // repeatWhen returns a Flux<Void>
      // so this needs to be last and the variable a Publisher<Void>
      publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
    return Mono.fromDirect(publisher);

public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
      .handle(new ReactorNettyHandler(handler))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
  return new MonoToListenableFutureAdapter<>(connectMono);


 * @param whenFactory
 * @return
 * @see reactor.core.publisher.Mono#retryWhen(java.util.function.Function)
public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
  return boxed.retryWhen(whenFactory);

 * Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}.
 * For easier use, use the Retry builder from
 * <a href="">Reactor Extras</a>.
 * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
 * @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry
 * @param <I>
 * @param <O>
 * @see Flux#retryWhen(Function)
public static <I, O> Function<? super Flux<I>, Mono<O>> retryWhen(final Function<Flux<I>, Mono<O>> operation, final Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
  return request -> Mono.defer(() -> operation.apply(request)).retryWhen(whenFactory);

return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor));


 * To be used in order to retry the status operation for an application or task.
 * @param id The application id or the task id
 * @param <T> The type of status object being queried for, usually AppStatus or TaskStatus
 * @return The function that executes the retry logic around for determining App or Task Status
<T> Function<Mono<T>, Mono<T>> statusRetry(String id) {
  long statusTimeout = this.deploymentProperties.getStatusTimeout();
  long requestTimeout = Math.round(statusTimeout * 0.40); // wait 500ms with default status timeout of 2000ms
  long initialRetryDelay = Math.round(statusTimeout * 0.10); // wait 200ms with status timeout of 2000ms
  if (requestTimeout < 500L) {"Computed statusRetry Request timeout = {} ms is below 500ms minimum value.  Setting to 500ms", requestTimeout);
    requestTimeout = 500L;
  final long requestTimeoutToUse = requestTimeout;
  return m -> m.timeout(Duration.ofMillis(requestTimeoutToUse))
    .doOnError(e -> logger.warn(String.format("Error getting status for %s within %sms, Retrying operation.", id, requestTimeoutToUse)))
      Duration.ofMillis(initialRetryDelay), //initial retry delay
      Duration.ofMillis(statusTimeout / 2), // max retry delay
      Duration.ofMillis(statusTimeout)) // max total retry time
      .andThen(retries -> Flux.from(retries).doOnComplete(() ->"Successfully retried getStatus operation status [{}] for {}", id))))
    .doOnError(e -> logger.error(String.format("Retry operation on getStatus failed for %s.  Max retry time %sms", id, statusTimeout)));

 public Publisher<?> executeRequest(ReactiveHttpRequest request) {
  Mono<?> response = publisherClient.executeRequest(request);
  return response.retryWhen(retryFunction).onErrorMap(outOfRetries());

public Publisher<Object> executeRequest(ReactiveHttpRequest request) {
  Publisher<Object> objectPublisher = reactiveClient.executeRequest(request);
  if (returnPublisherType == Mono.class) {
    return ((Mono<Object>) objectPublisher).retryWhen(retryFunction)
  else {
    return ((Flux<Object>) objectPublisher).retryWhen(retryFunction)

.doOnError(e2 -> onExhausted.accept(unwrap(e2)))
.onErrorMap(RetryExhaustedException.class, e2 -> errorFunction.apply(unwrap(e2.getCause())));

public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
      .handle(new ReactorNettyHandler(handler))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
  return new MonoToListenableFutureAdapter<>(connectMono);

public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
      .handle(new ReactorNettyHandler(handler))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
  return new MonoToListenableFutureAdapter<>(connectMono);

.retryWhen(errors -> errors.zipWith(Flux.range(1, 4), (a, b) -> b)
             .flatMap(attempt -> {
               switch (attempt) {

