rx.Observable.retry()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(244)

本文整理了Java中rx.Observable.retry()方法的一些代码示例,展示了Observable.retry()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.retry()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:retry

Observable.retry介绍

[英]Returns an Observable that mirrors the source Observable, resubscribing to it if it calls onError(infinite retry count).

If the source Observable calls Observer#onError, this method will resubscribe to the source Observable rather than propagating the onError call.

Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second time and emits [1, 2, 3, 4, 5] then the complete sequence of emissions and notifications would be [1, 2, 1, 2, 3, 4, 5, onCompleted]. Scheduler: retry operates by default on the trampoline Scheduler.
[中]返回一个可观察对象,该对象镜像源可观察对象,如果调用OneError(无限重试计数),则重新订阅该对象。
如果源Observable调用Observator#onError,此方法将重新订阅源Observable,而不是传播onError调用。
源可观测项发出的任何和所有项都将由结果可观测项发出,即使是在订阅失败时发出的项。例如,如果一个可观测对象一开始失败,但发出[1,2],然后第二次成功并发出[1,2,3,4,5],那么完整的发射和通知序列将是[1,2,1,2,3,4,5,未完成]。调度程序:默认情况下,retry在trampoline调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testRxRetry() throws Exception {
  // see https://github.com/Netflix/Hystrix/issues/1100
  // Since each command instance is single-use, the expectation is that applying the .retry() operator
  // results in only a single execution and propagation out of that error
  HystrixCommand<Integer> cmd = getLatentCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.FAILURE, 300,
      AbstractTestHystrixCommand.FallbackResult.UNIMPLEMENTED, 100);
  final CountDownLatch latch = new CountDownLatch(1);
  System.out.println(System.currentTimeMillis() + " : Starting");
  Observable<Integer> o = cmd.toObservable().retry(2);
  System.out.println(System.currentTimeMillis() + " Created retried command : " + o);
  o.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
      System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted");
      latch.countDown();
    }
    @Override
    public void onError(Throwable e) {
      System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e);
      latch.countDown();
    }
    @Override
    public void onNext(Integer integer) {
      System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + integer);
    }
  });
  latch.await(1000, TimeUnit.MILLISECONDS);
  System.out.println(System.currentTimeMillis() + " ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {

    /*
     * retry(n) can be used to immediately retry n times
     */
    Observable.create(s -> {
      System.out.println("1) subscribing");
      s.onError(new RuntimeException("1) always fails"));
    }).retry(3).subscribe(System.out::println, t -> System.out.println("1) Error: " + t));

    System.out.println("");
    
    /*
     * retryWhen allows custom behavior on when and if a retry should be done
     */
    Observable.create(s -> {
      System.out.println("2) subscribing");
      s.onError(new RuntimeException("2) always fails"));
    }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
        System.out.println("2) delay retry by " + i + " second(s)");
        return Observable.timer(i, TimeUnit.SECONDS);
      }).concatWith(Observable.error(new RuntimeException("Failed after 3 retries")));
    }).toBlocking().forEach(System.out::println);

  }
}

代码示例来源:origin: jhusain/learnrxjava

/**
 * The data stream fails intermittently so return the stream
 * with retry capability.
 */
public Observable<String> retry(Observable<String> data) {
  return data.retry();
}

代码示例来源:origin: ReactiveX/RxNetty

.retry() // Retry when there is an error in timer.
.concatMap(new IdleConnectionCleanupTask())
.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {

代码示例来源:origin: jhusain/learnrxjava

public static void subscribe(Observable<String> o) {
    o = o.materialize().flatMap(n -> {
      if (n.isOnError()) {
        if (n.getThrowable() instanceof IllegalStateException) {
          return Observable.just(n);
        } else {
          return Observable.error(n.getThrowable());
        }
      } else {
        return Observable.just(n);
      }
    }).retry().dematerialize();

    o.subscribe(System.out::println, t -> t.printStackTrace());
  }
}

代码示例来源:origin: ribot/ribot-app-android

private void performCheckOutLatestEncounter() {
  Timber.i("Checking out...");
  String checkInId = getLatestEncounterCheckInId();
  if (checkInId == null) {
    Timber.e("Cannot check-out because latest encounter check-in ID is null");
    return;
  }
  if (mCheckInSubscription != null) mCheckInSubscription.unsubscribe();
  mCheckInSubscription = mDataManager.checkOut(checkInId)
      .retry(3)
      .subscribeOn(Schedulers.io())
      .subscribe(new Subscriber<CheckIn>() {
        @Override
        public void onCompleted() {
          Timber.i("Checked out successfully!");
        }
        @Override
        public void onError(Throwable e) {
          Timber.e(e, "There was an error checking out");
        }
        @Override
        public void onNext(CheckIn checkIn) {
        }
      });
}

代码示例来源:origin: gitskarios/GithubAndroidSdk

@Override
public Observable<GithubAuthorization> observable() {
 return super.observable().retry(0);
}

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

private Observable<String> getServicesFromDiscoveryClient() {
  logger.info("Discovering services");
  return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
      .map(String::toLowerCase)
      .doOnNext(s -> logger.debug("Service discovered: " + s))
      .doOnError(e -> {
        String error = "Error retrieving services: " + e.getMessage();
        logger.error(error);
        publisher.publishEvent(new SystemEvent(error, e));
      })
      .retry();
}

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

protected Observable<String> getServiceIdsFromDiscoveryClient() {
  logger.info("Discovering services for mappings");
  return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
      .map(id -> id.toLowerCase())
      .filter(id -> !id.equals(ZUUL))
      .doOnNext(s -> logger.debug("Service discovered: " + s))
      .doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
      .retry();
}

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

protected Observable<String> getServiceIdsFromDiscoveryClient() {
  logger.info("Discovering services for health");
  return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
      .map(id -> id.toLowerCase())
      .filter(id -> !id.equals(ZUUL))
      .doOnNext(s -> logger.debug("Service discovered: " + s))
      .doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
      .retry();
}

代码示例来源:origin: pengrad/java-telegram-bot-api

private Observable<Document> getWebPage(String url) {
  return Observable.<Document>create(subscriber -> {
    try {
      Document document = Jsoup.connect(url).get();
      subscriber.onNext(document);
      subscriber.onCompleted();
    } catch (IOException e) {
      subscriber.onError(e);
    }
  }).retry(2);
}

代码示例来源:origin: com.netflix.spinnaker.echo/echo-pipelinetriggers

@PostConstruct
public void start() {
 if (subscription == null || subscription.isUnsubscribed()) {
  subscription = Observable.interval(pollingIntervalSeconds, SECONDS, scheduler)
   .doOnNext(this::onFront50Request)
   .flatMap(tick -> front50.getPipelines())
   .doOnError(this::onFront50Error)
   .retry()
   .subscribe(this::cachePipelines);
 }
}

代码示例来源:origin: Aptoide/aptoide-client-v8

public void setup() {
 subscriptions.add(
   Observable.interval(initialDelay, sendInterval, TimeUnit.MILLISECONDS, timerScheduler)
     .flatMap(time -> persistence.getAll()
       .first())
     .filter(events -> events.size() > 0)
     .flatMapCompletable(events -> sendEvents(new ArrayList<>(events)))
     .doOnError(throwable -> crashReport.log(throwable))
     .retry()
     .subscribe());
}

代码示例来源:origin: Petikoch/Java_MVVM_with_Swing_and_RxJava_Examples

@Override
public void connectTo(final Example_7a_Model model) {
  model.getLogs()
      .doOnError(throwable -> vm2v_log.onNext(new LogRow("!", "Unexpected error -> will retry here in ViewModel", "Error: " + throwable.getMessage())))
      .retry(5)
      .onErrorReturn(throwable -> new LogRow("!!!", "Too many unexpected errors -> stop", "Error: " + throwable.getMessage()))
      .subscribe(vm2v_log);
}

代码示例来源:origin: gitskarios/GithubAndroidSdk

public Observable<K> observable() {
 return getApiObservable(getRestAdapter()).retry(this::retry).debounce(100, TimeUnit.MILLISECONDS);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_281() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .doOnError(th -> log.warn("Will retry", th))
      .retry()
      .subscribe(log::info);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_296() throws Exception {
  Observable
      .defer(() -> risky())
      .retry();
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_303() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retry(10);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_291() throws Exception {
  risky().cache().retry();  //BROKEN
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_310() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retry((attempt, e) ->
          attempt <= 10 && !(e instanceof TimeoutException));
}

相关文章

Observable类方法