io.reactivex.Observable.retry()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中io.reactivex.Observable.retry()方法的一些代码示例,展示了Observable.retry()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.retry()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:retry

Observable.retry介绍

[英]Returns an Observable that mirrors the source ObservableSource, resubscribing to it if it calls onError(infinite retry count).

If the source ObservableSource calls Observer#onError, this method will resubscribe to the source ObservableSource rather than propagating the onError call.

Any and all items emitted by the source ObservableSource will be emitted by the resulting ObservableSource, even those emitted during failed subscriptions. For example, if an ObservableSource fails at first but emits [1, 2] then succeeds the second time and emits [1, 2, 3, 4, 5] then the complete sequence of emissions and notifications would be [1, 2, 1, 2, 3, 4, 5, onComplete]. Scheduler: retry does not operate by default on a particular Scheduler.
[中]返回反映源ObservableSource的Observable,如果调用OneError(无限重试计数),则重新订阅。
如果源ObserviceSource调用Observator#OneError,此方法将重新订阅源ObserviceSource,而不是传播OneError调用。
源ObserveSource发出的任何和所有项都将由生成的ObserveSource发出,即使是在订阅失败时发出的项。例如,如果一个可观测资源一开始失败,但发出[1,2],然后第二次成功并发出[1,2,3,4,5],那么完整的发射和通知序列将是[1,2,1,2,3,4,5,onComplete]。调度程序:默认情况下,重试不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Retries the current Observable if the predicate returns true.
 * <p>
 * <img width="640" height="248" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.o.e.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param predicate the predicate that receives the failure Throwable and should return true to trigger a retry.
 * @return the new Observable instance
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> retry(Predicate<? super Throwable> predicate) {
  return retry(Long.MAX_VALUE, predicate);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryFunctionNull() {
  just1.retry((BiPredicate<Integer, Throwable>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryPredicateNull() {
  just1.retry((Predicate<Throwable>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryCountFunctionNull() {
  just1.retry(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Retries until the given stop function returns true.
 * <p>
 * <img width="640" height="261" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retryUntil.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param stop the function that should return true to stop retrying
 * @return the new Observable instance
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> retryUntil(final BooleanSupplier stop) {
  ObjectHelper.requireNonNull(stop, "stop is null");
  return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testJustAndRetry() throws Exception {
  final AtomicBoolean throwException = new AtomicBoolean(true);
  int value = Observable.just(1).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      if (throwException.compareAndSet(true, false)) {
        throw new TestException();
      }
      return t1;
    }
  }).retry(1).blockingSingle();
  assertEquals(1, value);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void retryLongPredicateInvalid() {
  try {
    Observable.just(1).retry(-99, new Predicate<Throwable>() {
      @Override
      public boolean test(Throwable e) throws Exception {
        return true;
      }
    });
    fail("Should have thrown");
  } catch (IllegalArgumentException ex) {
    assertEquals("times >= 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithNothingToRetry() {
  Observable<Integer> source = Observable.range(0, 3);
  Observer<Integer> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.retry(retryTwice).subscribe(o);
  inOrder.verify(o).onNext(0);
  inOrder.verify(o).onNext(1);
  inOrder.verify(o).onNext(2);
  inOrder.verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSourceObservableRetry1() throws InterruptedException {
  final AtomicInteger subsCount = new AtomicInteger(0);
  final TestObserver<String> to = new TestObserver<String>();
  ObservableSource<String> onSubscribe = new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      subsCount.incrementAndGet();
      observer.onError(new RuntimeException("failed"));
    }
  };
  Observable.unsafeCreate(onSubscribe).retry(1).subscribe(to);
  assertEquals(2, subsCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSourceObservableRetry0() throws InterruptedException {
  final AtomicInteger subsCount = new AtomicInteger(0);
  final TestObserver<String> to = new TestObserver<String>();
  ObservableSource<String> onSubscribe = new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      subsCount.incrementAndGet();
      observer.onError(new RuntimeException("failed"));
    }
  };
  Observable.unsafeCreate(onSubscribe).retry(0).subscribe(to);
  assertEquals(1, subsCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue2826() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  final RuntimeException e = new RuntimeException("You shall not pass");
  final AtomicInteger c = new AtomicInteger();
  Observable.just(1).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      c.incrementAndGet();
      throw e;
    }
  }).retry(retry5).subscribe(to);
  to.assertTerminated();
  assertEquals(6, c.get());
  assertEquals(Collections.singletonList(e), to.errors());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testInfiniteRetry() {
  int numFailures = 20;
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numFailures));
  origin.retry().subscribe(observer);
  InOrder inOrder = inOrder(observer);
  // should show 3 attempts
  inOrder.verify(observer, times(1 + numFailures)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(observer, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRetrySuccess() {
  int numFailures = 1;
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numFailures));
  origin.retry(3).subscribe(observer);
  InOrder inOrder = inOrder(observer);
  // should show 3 attempts
  inOrder.verify(observer, times(1 + numFailures)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(observer, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRetryIndefinitely() {
  Observer<String> observer = TestHelper.mockObserver();
  int numRetries = 20;
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numRetries));
  origin.retry().subscribe(new TestObserver<String>(observer));
  InOrder inOrder = inOrder(observer);
  // should show 3 attempts
  inOrder.verify(observer, times(numRetries + 1)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(observer, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 10000)
public void testTimeoutWithRetry() {
  Observer<Long> observer = TestHelper.mockObserver();
  // Observable that sends every 100ms (timeout fails instead)
  SlowObservable so = new SlowObservable(100, 10, "testTimeoutWithRetry");
  Observable<Long> o = Observable.unsafeCreate(so).timeout(80, TimeUnit.MILLISECONDS).retry(5);
  AsyncObserver<Long> async = new AsyncObserver<Long>(observer);
  o.subscribe(async);
  async.await();
  InOrder inOrder = inOrder(observer);
  // Should fail once
  inOrder.verify(observer, times(1)).onError(any(Throwable.class));
  inOrder.verify(observer, never()).onComplete();
  assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 10000)
public void testUnsubscribeAfterError() {
  Observer<Long> observer = TestHelper.mockObserver();
  // Observable that always fails after 100ms
  SlowObservable so = new SlowObservable(100, 0, "testUnsubscribeAfterError");
  Observable<Long> o = Observable.unsafeCreate(so).retry(5);
  AsyncObserver<Long> async = new AsyncObserver<Long>(observer);
  o.subscribe(async);
  async.await();
  InOrder inOrder = inOrder(observer);
  // Should fail once
  inOrder.verify(observer, times(1)).onError(any(Throwable.class));
  inOrder.verify(observer, never()).onComplete();
  assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
  assertEquals("Only 1 active subscription", 1, so.maxActive.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void retryPredicate() {
  Observable.just(1).concatWith(Observable.<Integer>error(new TestException()))
  .retry(new Predicate<Throwable>() {
    @Override
    public boolean test(Throwable v) throws Exception {
      return true;
    }
  })
  .take(5)
  .test()
  .assertResult(1, 1, 1, 1, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dontRetry() {
  Observable.error(new TestException("Outer"))
  .retry(Functions.alwaysFalse())
  .test()
  .assertFailureAndMessage(TestException.class, "Outer");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void predicateThrows() {
  TestObserver<Object> to = Observable.error(new TestException("Outer"))
  .retry(new Predicate<Throwable>() {
    @Override
    public boolean test(Throwable e) throws Exception {
      throw new TestException("Inner");
    }
  })
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void bipredicateThrows() {
  TestObserver<Object> to = Observable.error(new TestException("Outer"))
  .retry(new BiPredicate<Integer, Throwable>() {
    @Override
    public boolean test(Integer n, Throwable e) throws Exception {
      throw new TestException("Inner");
    }
  })
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

相关文章

Observable类方法