reactor.core.publisher.Mono.retryWhen()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(692)

本文整理了Java中reactor.core.publisher.Mono.retryWhen()方法的一些代码示例,展示了Mono.retryWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.retryWhen()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:retryWhen

Mono.retryWhen介绍

[英]Retries this Mono when a companion sequence signals an item in response to this Mono error signal

If the companion sequence signals when the Mono is active, the retry attempt is suppressed and any terminal signal will terminate the Mono source with the same signal immediately.

Note that if the companion Publisher created by the whenFactoryemits Context as trigger objects, the content of these Context will be added to the operator's own Context.
[中]当伴奏序列向一个项目发送信号以响应此单声道错误信号时,重试此单声道
如果伴生序列在单声道激活时发出信号,重试尝试将被抑制,任何终端信号将立即用相同的信号终止单声道源。
请注意,如果WhenFactoryEmit创建的伴生发布服务器将上下文作为触发器对象,则这些上下文的内容将添加到操作员自己的上下文中。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Instance> computeIfPresent(InstanceId id,
                    BiFunction<InstanceId, Instance, Mono<Instance>> remappingFunction) {
  return this.find(id)
        .flatMap(application -> remappingFunction.apply(id, application))
        .flatMap(this::save)
        .retryWhen(retryOnOptimisticLockException);
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Instance> compute(InstanceId id, BiFunction<InstanceId, Instance, Mono<Instance>> remappingFunction) {
  return this.find(id)
        .flatMap(application -> remappingFunction.apply(id, application))
        .switchIfEmpty(Mono.defer(() -> remappingFunction.apply(id, null)))
        .flatMap(this::save)
        .retryWhen(retryOnOptimisticLockException);
}

代码示例来源:origin: reactor/reactor-core

return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor));

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: reactor/reactor-core

Mono<String> exponentialRetryScenario() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.success("hey");
    }
    else {
      s.error(new RuntimeException("test " + i));
    }
  }).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
  return (exchange, chain) -> {
    trace("Entering retry-filter");
    // chain.filter returns a Mono<Void>
    Publisher<Void> publisher = chain.filter(exchange)
        //.log("retry-filter", Level.INFO)
        .doOnSuccessOrError((aVoid, throwable) -> {
          int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
          int newIteration = iteration + 1;
          trace("setting new iteration in attr %d", newIteration);
          exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
        });
    if (retry != null) {
      // retryWhen returns a Mono<Void>
      // retry needs to go before repeat
      publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
    }
    if (repeat != null) {
      // repeatWhen returns a Flux<Void>
      // so this needs to be last and the variable a Publisher<Void>
      publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
    }
    return Mono.fromDirect(publisher);
  };
}

代码示例来源:origin: org.springframework/spring-messaging

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param whenFactory
 * @return
 * @see reactor.core.publisher.Mono#retryWhen(java.util.function.Function)
 */
public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
  return boxed.retryWhen(whenFactory);
}
/**

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

/**
 * Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}.
 *
 * For easier use, use the Retry builder from
 * <a href="https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java">Reactor Extras</a>.
 *
 * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
 * @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry
 * @param <I>
 * @param <O>
 *
 * @see Flux#retryWhen(Function)
 */
public static <I, O> Function<? super Flux<I>, Mono<O>> retryWhen(final Function<Flux<I>, Mono<O>> operation, final Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
  return request -> Mono.defer(() -> operation.apply(request)).retryWhen(whenFactory);
}

代码示例来源:origin: io.projectreactor/reactor-core

return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor));

代码示例来源:origin: org.springframework.cloud/spring-cloud-deployer-cloudfoundry

/**
 * To be used in order to retry the status operation for an application or task.
 * @param id The application id or the task id
 * @param <T> The type of status object being queried for, usually AppStatus or TaskStatus
 * @return The function that executes the retry logic around for determining App or Task Status
 */
<T> Function<Mono<T>, Mono<T>> statusRetry(String id) {
  long statusTimeout = this.deploymentProperties.getStatusTimeout();
  long requestTimeout = Math.round(statusTimeout * 0.40); // wait 500ms with default status timeout of 2000ms
  long initialRetryDelay = Math.round(statusTimeout * 0.10); // wait 200ms with status timeout of 2000ms
  if (requestTimeout < 500L) {
    logger.info("Computed statusRetry Request timeout = {} ms is below 500ms minimum value.  Setting to 500ms", requestTimeout);
    requestTimeout = 500L;
  }
  final long requestTimeoutToUse = requestTimeout;
  return m -> m.timeout(Duration.ofMillis(requestTimeoutToUse))
    .doOnError(e -> logger.warn(String.format("Error getting status for %s within %sms, Retrying operation.", id, requestTimeoutToUse)))
    .retryWhen(DelayUtils.exponentialBackOffError(
      Duration.ofMillis(initialRetryDelay), //initial retry delay
      Duration.ofMillis(statusTimeout / 2), // max retry delay
      Duration.ofMillis(statusTimeout)) // max total retry time
      .andThen(retries -> Flux.from(retries).doOnComplete(() ->
        logger.info("Successfully retried getStatus operation status [{}] for {}", id))))
    .doOnError(e -> logger.error(String.format("Retry operation on getStatus failed for %s.  Max retry time %sms", id, statusTimeout)));
}

代码示例来源:origin: kptfh/feign-reactive

@Override
 public Publisher<?> executeRequest(ReactiveHttpRequest request) {
  Mono<?> response = publisherClient.executeRequest(request);
  return response.retryWhen(retryFunction).onErrorMap(outOfRetries());
 }
}

代码示例来源:origin: io.github.reactivefeign/feign-reactive-core

@Override
public Publisher<Object> executeRequest(ReactiveHttpRequest request) {
  Publisher<Object> objectPublisher = reactiveClient.executeRequest(request);
  if (returnPublisherType == Mono.class) {
    return ((Mono<Object>) objectPublisher).retryWhen(retryFunction)
        .onErrorMap(outOfRetries());
  }
  else {
    return ((Flux<Object>) objectPublisher).retryWhen(retryFunction)
        .onErrorMap(outOfRetries());
  }
}

代码示例来源:origin: org.mule.runtime/mule-core

.retryWhen(retry)
.doOnError(e2 -> onExhausted.accept(unwrap(e2)))
.onErrorMap(RetryExhaustedException.class, e2 -> errorFunction.apply(unwrap(e2.getCause())));

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-messaging

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: reactor/reactor-netty

.wiretap(true)
.connect()
.retryWhen(errors -> errors.zipWith(Flux.range(1, 4), (a, b) -> b)
             .flatMap(attempt -> {
               switch (attempt) {

相关文章

Mono类方法