[英]Returns a Flowable that mirrors the source Publisher, resubscribing to it if it calls onError(infinite retry count).

If the source Publisher calls Subscriber#onError, this method will resubscribe to the source Publisher rather than propagating the onError call.

Any and all items emitted by the source Publisher will be emitted by the resulting Publisher, even those emitted during failed subscriptions. For example, if a Publisher fails at first but emits [1, 2] then succeeds the second time and emits [1, 2, 3, 4, 5] then the complete sequence of emissions and notifications would be [1, 2, 1, 2, 3, 4, 5, onComplete]. Backpressure: The operator honors downstream backpressure and expects the source Publisher to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: retry does not operate by default on a particular Scheduler.


  public Publisher<Integer> createPublisher(long elements) {
        Flowable.range(0, (int)elements).retry(1)

@Test(expected = NullPointerException.class)
public void retryPredicateNull() {

@Test(expected = NullPointerException.class)
public void retryFunctionNull() {
  just1.retry((BiPredicate<Integer, Throwable>)null);

@Test(expected = NullPointerException.class)
public void retryCountFunctionNull() {
  just1.retry(1, null);

 * Repeatedly re-subscribes to the current Single indefinitely if it fails with an onError.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @return the new Single instance
 * @since 2.0
public final Single<T> retry() {
  return toSingle(toFlowable().retry());

 * Returns a Completable that retries this Completable as long as it emits an onError event.
 * <p>
 * <img width="640" height="368" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @return the new Completable instance
public final Completable retry() {
  return fromPublisher(toFlowable().retry());

 * Repeatedly re-subscribe at most the specified times to the current Single
 * if it fails with an onError.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param times the number of times to resubscribe if the current Single fails
 * @return the new Single instance
 * @since 2.0
public final Single<T> retry(long times) {
  return toSingle(toFlowable().retry(times));

public void testWithNothingToRetry() {
  Flowable<Integer> source = Flowable.range(0, 3);
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  InOrder inOrder = inOrder(subscriber);
  verify(subscriber, never()).onError(any(Throwable.class));

public void testRetryIndefinitely() {
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  int numRetries = 20;
  Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
  origin.retry().subscribe(new TestSubscriber<String>(subscriber));
  InOrder inOrder = inOrder(subscriber);
  // should show 3 attempts
  inOrder.verify(subscriber, times(numRetries + 1)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(subscriber, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(subscriber, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(subscriber, times(1)).onComplete();

public void retryLongPredicateInvalid() {
  try {
    Flowable.just(1).retry(-99, new Predicate<Throwable>() {
      public boolean test(Throwable e) throws Exception {
        return true;
    fail("Should have thrown");
  } catch (IllegalArgumentException ex) {
    assertEquals("times >= 0 required but it was -99", ex.getMessage());

public void testSourceFlowableRetry0() throws InterruptedException {
  final AtomicInteger subsCount = new AtomicInteger(0);
  final TestSubscriber<String> ts = new TestSubscriber<String>();
  Publisher<String> onSubscribe = new Publisher<String>() {
    public void subscribe(Subscriber<? super String> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onError(new RuntimeException("failed"));
  assertEquals(1, subsCount.get());

public void testSourceFlowableRetry1() throws InterruptedException {
  final AtomicInteger subsCount = new AtomicInteger(0);
  final TestSubscriber<String> ts = new TestSubscriber<String>();
  Publisher<String> onSubscribe = new Publisher<String>() {
    public void subscribe(Subscriber<? super String> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onError(new RuntimeException("failed"));
  assertEquals(2, subsCount.get());

@Test//(timeout = 10000)
public void testTimeoutWithRetry() {
  Subscriber<Long> subscriber = TestHelper.mockSubscriber();
  // Flowable that sends every 100ms (timeout fails instead)
  SlowFlowable sf = new SlowFlowable(100, 10, "testTimeoutWithRetry");
  Flowable<Long> f = Flowable.unsafeCreate(sf).timeout(80, TimeUnit.MILLISECONDS).retry(5);
  AsyncSubscriber<Long> async = new AsyncSubscriber<Long>(subscriber);
  InOrder inOrder = inOrder(subscriber);
  // Should fail once
  inOrder.verify(subscriber, times(1)).onError(any(Throwable.class));
  inOrder.verify(subscriber, never()).onComplete();
  assertEquals("Start 6 threads, retry 5 then fail on 6", 6, sf.efforts.get());

@Test(timeout = 10000)
public void testUnsubscribeAfterError() {
  Subscriber<Long> subscriber = TestHelper.mockSubscriber();
  // Flowable that always fails after 100ms
  SlowFlowable so = new SlowFlowable(100, 0, "testUnsubscribeAfterError");
  Flowable<Long> f = Flowable.unsafeCreate(so).retry(5);
  AsyncSubscriber<Long> async = new AsyncSubscriber<Long>(subscriber);
  InOrder inOrder = inOrder(subscriber);
  // Should fail once
  inOrder.verify(subscriber, times(1)).onError(any(Throwable.class));
  inOrder.verify(subscriber, never()).onComplete();
  assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
  assertEquals("Only 1 active subscription", 1, so.maxActive.get());

public void testJustAndRetry() throws Exception {
  final AtomicBoolean throwException = new AtomicBoolean(true);
  int value = Flowable.just(1).map(new Function<Integer, Integer>() {
    public Integer apply(Integer t1) {
      if (throwException.compareAndSet(true, false)) {
        throw new TestException();
      return t1;
  assertEquals(1, value);

public void testIssue2826() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final RuntimeException e = new RuntimeException("You shall not pass");
  final AtomicInteger c = new AtomicInteger();
  Flowable.just(1).map(new Function<Integer, Integer>() {
    public Integer apply(Integer t1) {
      throw e;
  assertEquals(6, c.get());
  assertEquals(Collections.singletonList(e), ts.errors());

public void dontRetry() {
  Flowable.error(new TestException("Outer"))
  .assertFailureAndMessage(TestException.class, "Outer");

public void predicateThrows() {
  TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
  .retry(new Predicate<Throwable>() {
    public boolean test(Throwable e) throws Exception {
      throw new TestException("Inner");
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");

public void bipredicateThrows() {
  TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
  .retry(new BiPredicate<Integer, Throwable>() {
    public boolean test(Integer n, Throwable e) throws Exception {
      throw new TestException("Inner");
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");

public void retryPredicate() {
  Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .retry(new Predicate<Throwable>() {
    public boolean test(Throwable v) throws Exception {
      return true;
  .assertResult(1, 1, 1, 1, 1);

