rx.Observable.retryWhen()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(160)

本文整理了Java中rx.Observable.retryWhen()方法的一些代码示例,展示了Observable.retryWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.retryWhen()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:retryWhen

Observable.retryWhen介绍

[英]Returns an Observable that emits the same values as the source observable with the exception of an onError. An onError notification from the source will result in the emission of a Notification to the Observable provided as an argument to the notificationHandlerfunction. If the Observable returned onCompletes or onErrors then retry will call onCompleted or onError on the child subscription. Otherwise, this Observable will resubscribe to the source Observable.

Example: This retries 3 times, each time incrementing the number of seconds it waits.

Observable.create((Subscriber s) -> ).retryWhen(attempts ->  
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i ->  
System.out.println("delay retry by " + i + " second(s)"); 
return Observable.timer(i, TimeUnit.SECONDS); 
}); 
}).toBlocking().forEach(System.out::println); 
}

Output is:

subscribing

Scheduler: retryWhen operates by default on the trampoline Scheduler.
[中]返回与源可观测值发射相同值的可观测值,但OneError除外。来自源的onError通知将导致向作为NotificationHandler函数参数提供的可观察对象发出通知。如果可观察到的返回onCompleted或onErrors,则retry将调用子订阅上的onCompleted或onError。否则,该可观察对象将重新订阅源可观察对象。
示例:重试3次,每次增加等待的秒数。

Observable.create((Subscriber s) -> ).retryWhen(attempts ->  
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i ->  
System.out.println("delay retry by " + i + " second(s)"); 
return Observable.timer(i, TimeUnit.SECONDS); 
}); 
}).toBlocking().forEach(System.out::println); 
}

输出为:

subscribing

调度程序:retryWhen默认在蹦床调度程序上运行。

代码示例

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {

    /*
     * retry(n) can be used to immediately retry n times
     */
    Observable.create(s -> {
      System.out.println("1) subscribing");
      s.onError(new RuntimeException("1) always fails"));
    }).retry(3).subscribe(System.out::println, t -> System.out.println("1) Error: " + t));

    System.out.println("");
    
    /*
     * retryWhen allows custom behavior on when and if a retry should be done
     */
    Observable.create(s -> {
      System.out.println("2) subscribing");
      s.onError(new RuntimeException("2) always fails"));
    }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
        System.out.println("2) delay retry by " + i + " second(s)");
        return Observable.timer(i, TimeUnit.SECONDS);
      }).concatWith(Observable.error(new RuntimeException("Failed after 3 retries")));
    }).toBlocking().forEach(System.out::println);

  }
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@OnClick(R.id.btn_eb_retry)
public void startRetryingWithExponentialBackoffStrategy() {
  _logs = new ArrayList<>();
  _adapter.clear();
  _subscriptions.add(//
     Observable//
        .error(new RuntimeException("testing")) // always fails
        .retryWhen(new RetryWithDelay(5, 1000))//
        .doOnSubscribe(new Action0() {
          @Override
          public void call() {
            _log("Attempting the impossible 5 times in intervals of 1s");
          }
        })//
        .subscribe(new Observer<Object>() {
          @Override
          public void onCompleted() {
            Timber.d("on Completed");
          }
          @Override
          public void onError(Throwable e) {
            _log("Error: I give up!");
          }
          @Override
          public void onNext(Object aVoid) {
            Timber.d("on Next");
          }
        }));
}

代码示例来源:origin: jhusain/learnrxjava

}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (throwable, i) -> i)
      .flatMap(i -> {

代码示例来源:origin: com.couchbase.client/java-client

/**
 * Wrap an {@link Observable} so that it will retry on some errors. The retry will occur for a maximum number of
 * attempts and with a provided {@link Delay} between each attempt represented by the {@link RetryWithDelayHandler},
 * which can also filter on errors and stop the retry cycle for certain type of errors.
 *
 * @param source the {@link Observable} to wrap.
 * @param handler the {@link RetryWithDelayHandler}, describes maximum number of attempts, delay and fatal errors.
 * @param <T> the type of items emitted by the source Observable.
 * @return the wrapped retrying Observable.
 */
public static <T> Observable<T> wrapForRetry(Observable<T> source, final RetryWithDelayHandler handler) {
  return source.retryWhen(new RetryWhenFunction(handler));
}

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

@Override
public void startReplication() {
  if (stateRef.compareAndSet(STATE.Idle, STATE.Replicating)) {
    // TODO better retry func?
    connection.getRetryableLifecycle()
        .retryWhen(new RetryStrategyFunc(retryWaitMillis))
        .subscribe(replicationSubscriber);
  }
}

代码示例来源:origin: xiaolongonly/Ticket-Analysis

public static <T> Observable.Transformer<T, T> applySchedulersWithToken() {
  return tObservable -> tObservable
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.io())
      .onErrorResumeNext(errorResumeFunc())
      .retryWhen(new RetryWhenNetworkException())
      .observeOn(AndroidSchedulers.mainThread());
}

代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient

@Nonnull
@Override
public Observable<RxObjectEvent> connection() {
  return sockets.webSocketObservable()
      .retryWhen(repeatDuration(1, TimeUnit.SECONDS));
}

代码示例来源:origin: NielsMasdorp/Speculum-Android

@Override
public void loadCalendarEvents(int updateDelay, Subscriber<String> subscriber) {
  compositeSubscription.add(Observable.interval(0, updateDelay, TimeUnit.MINUTES)
      .flatMap(ignore -> googleCalendarService.getCalendarEvents())
      .retryWhen(Observables.exponentialBackoff(AMOUNT_OF_RETRIES, DELAY_IN_SECONDS, TimeUnit.SECONDS))
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .subscribe(subscriber));
}

代码示例来源:origin: NielsMasdorp/Speculum-Android

@Override
public void loadTopRedditPost(String subreddit, int updateDelay, Subscriber<RedditPost> subscriber) {
  compositeSubscription.add(Observable.interval(0, updateDelay, TimeUnit.MINUTES)
      .flatMap(ignore -> redditService.getApi().getTopRedditPostForSubreddit(subreddit, Constants.REDDIT_LIMIT))
      .flatMap(redditService::getRedditPost)
      .retryWhen(Observables.exponentialBackoff(AMOUNT_OF_RETRIES, DELAY_IN_SECONDS, TimeUnit.SECONDS))
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .subscribe(subscriber));
}

代码示例来源:origin: NielsMasdorp/Speculum-Android

@Override
public void loadWeather(String location, boolean celsius, int updateDelay, String apiKey, Subscriber<Weather> subscriber) {
  final String query = celsius ? Constants.WEATHER_QUERY_SECOND_CELSIUS : Constants.WEATHER_QUERY_SECOND_FAHRENHEIT;
  compositeSubscription.add(Observable.interval(0, updateDelay, TimeUnit.MINUTES)
      .flatMap(ignore -> forecastIOService.getApi().getCurrentWeatherConditions(apiKey, location, query))
      .flatMap(response -> forecastIOService.getCurrentWeather(response, weatherIconGenerator, application, celsius))
      .retryWhen(Observables.exponentialBackoff(AMOUNT_OF_RETRIES, DELAY_IN_SECONDS, TimeUnit.SECONDS))
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .subscribe(subscriber));
}

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

public static <T> Observable.Transformer<T, T> retryDelay(Action1<Attempt> onAttempt, int maxRetry) {
  return o -> o.retryWhen(attempts -> attempts
      .zipWith(Observable.range(1, maxRetry), (err, i) -> new Attempt(i, err))
      .flatMap((Attempt x) -> {
        if (x.idx > maxRetry) return error(x.err);
        onAttempt.call(x);
        return timer(min(x.idx * x.idx, MAX_RETRY_TIME), SECONDS);
      }));
}

代码示例来源:origin: com.netflix.spinnaker.echo/echo-pipelinetriggers

@Override
public void call(Pipeline pipeline) {
 if (enabled) {
  log.warn("Triggering {} due to {}", pipeline, pipeline.getTrigger());
  counter.increment("orca.requests");
  createTriggerObservable(pipeline)
   .retryWhen(new RetryWithDelay(retryCount, retryDelayMillis))
   .subscribe(this::onOrcaResponse, throwable -> onOrcaError(pipeline, throwable));
 } else {
  log.info("Would trigger {} due to {} but triggering is disabled", pipeline, pipeline.getTrigger());
 }
}

代码示例来源:origin: com.netflix.eureka/eureka2-server

public void init() {
  Observable<InstanceInfo> selfInfoStream = resolve().distinctUntilChanged();
  subscription = connect(selfInfoStream).retryWhen(new RetryStrategyFunc(500)).subscribe();
}

代码示例来源:origin: com.microsoft.azure/azure-documentdb-rx

private Observable<DocumentServiceResponse> doReplace(RxDocumentServiceRequest request) {
  Observable<DocumentServiceResponse> responseObservable = Observable.defer(() -> {
    try {
      return this.gatewayProxy.doReplace(request).doOnNext(response -> {
        this.captureSessionToken(request, response);
      });
    } catch (Exception e) {
      return Observable.error(e);
    }
  }).retryWhen(createExecuteRequestRetryHandler(request));
  
  
  return createPutMoreContentObservable(request, HttpConstants.HttpMethods.PUT)
      .doOnNext(r -> applySessionToken(request)).flatMap(req -> responseObservable);
}

代码示例来源:origin: org.zalando.paradox/paradox-nakadi-consumer-core

private <T> Observable<T> handleRestart(final Observable<T> observable) {
  return observable.retryWhen(o -> o.compose(zipWithFlatMap("retry"))).repeatWhen(o ->
        o.compose(zipWithFlatMap("repeat")));
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test() {
    Observable.error(new IOException())
    .retryWhen(new RetryWhenObservable(5, 1))
    .test()
    .awaitTerminalEvent()
    .assertFailure(IOException.class);
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_74() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retryWhen(failures -> failures
          .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
              attempt < ATTEMPTS ?
                  Observable.timer(1, SECONDS) :
                  Observable.error(err))
          .flatMap(x -> x)
      );
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_89() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retryWhen(failures -> failures
          .zipWith(Observable.range(1, ATTEMPTS),
              this::handleRetryAttempt)
          .flatMap(x -> x)
      );
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
  public void sample_66() throws Exception {
    risky()
        .timeout(1, SECONDS)
//                .retryWhen(failures -> failures.take(10))
        .retryWhen(failures -> failures.delay(1, SECONDS));
  }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_28() throws Exception {
  Observable<String> retried = new BlockingCmd()
      .toObservable()
      .doOnError(ex -> log.warn("Error ", ex))
      .retryWhen(ex -> ex.delay(500, MILLISECONDS))
      .timeout(3, SECONDS);
}

相关文章

Observable类方法