io.reactivex.Flowable.retry()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(289)

本文整理了Java中io.reactivex.Flowable.retry()方法的一些代码示例,展示了Flowable.retry()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.retry()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:retry

Flowable.retry介绍

[英]Returns a Flowable that mirrors the source Publisher, resubscribing to it if it calls onError(infinite retry count).

If the source Publisher calls Subscriber#onError, this method will resubscribe to the source Publisher rather than propagating the onError call.

Any and all items emitted by the source Publisher will be emitted by the resulting Publisher, even those emitted during failed subscriptions. For example, if a Publisher fails at first but emits [1, 2] then succeeds the second time and emits [1, 2, 3, 4, 5] then the complete sequence of emissions and notifications would be [1, 2, 1, 2, 3, 4, 5, onComplete]. Backpressure: The operator honors downstream backpressure and expects the source Publisher to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: retry does not operate by default on a particular Scheduler.
[中]返回镜像源发布服务器的Flowable,如果调用onError(无限重试计数),则重新订阅源发布服务器。
如果源发布服务器调用Subscriber#onError,此方法将重新订阅源发布服务器,而不是传播onError调用。
源发布服务器发出的任何和所有项目都将由结果发布服务器发出,即使是在订阅失败时发出的项目。例如,如果发布服务器第一次失败,但发出[1,2],然后第二次成功并发出[1,2,3,4,5],那么完整的发出和通知顺序将是[1,2,1,2,3,4,5,onComplete]。背压:操作员接受下游背压,并希望源发布者也接受背压。如果违反此期望,运算符可能抛出一个非法状态异常。计划程序:默认情况下,重试不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).retry(1)
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryPredicateNull() {
  just1.retry((Predicate<Throwable>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryFunctionNull() {
  just1.retry((BiPredicate<Integer, Throwable>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryCountFunctionNull() {
  just1.retry(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Repeatedly re-subscribes to the current Single indefinitely if it fails with an onError.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @return the new Single instance
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry() {
  return toSingle(toFlowable().retry());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Completable that retries this Completable as long as it emits an onError event.
 * <p>
 * <img width="640" height="368" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retry.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @return the new Completable instance
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry() {
  return fromPublisher(toFlowable().retry());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Repeatedly re-subscribe at most the specified times to the current Single
 * if it fails with an onError.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param times the number of times to resubscribe if the current Single fails
 * @return the new Single instance
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(long times) {
  return toSingle(toFlowable().retry(times));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithNothingToRetry() {
  Flowable<Integer> source = Flowable.range(0, 3);
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  InOrder inOrder = inOrder(subscriber);
  source.retry(retryTwice).subscribe(subscriber);
  inOrder.verify(subscriber).onNext(0);
  inOrder.verify(subscriber).onNext(1);
  inOrder.verify(subscriber).onNext(2);
  inOrder.verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRetryIndefinitely() {
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  int numRetries = 20;
  Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
  origin.retry().subscribe(new TestSubscriber<String>(subscriber));
  InOrder inOrder = inOrder(subscriber);
  // should show 3 attempts
  inOrder.verify(subscriber, times(numRetries + 1)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(subscriber, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(subscriber, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void retryLongPredicateInvalid() {
  try {
    Flowable.just(1).retry(-99, new Predicate<Throwable>() {
      @Override
      public boolean test(Throwable e) throws Exception {
        return true;
      }
    });
    fail("Should have thrown");
  } catch (IllegalArgumentException ex) {
    assertEquals("times >= 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSourceFlowableRetry0() throws InterruptedException {
  final AtomicInteger subsCount = new AtomicInteger(0);
  final TestSubscriber<String> ts = new TestSubscriber<String>();
  Publisher<String> onSubscribe = new Publisher<String>() {
    @Override
    public void subscribe(Subscriber<? super String> s) {
      s.onSubscribe(new BooleanSubscription());
      subsCount.incrementAndGet();
      s.onError(new RuntimeException("failed"));
    }
  };
  Flowable.unsafeCreate(onSubscribe).retry(0).subscribe(ts);
  assertEquals(1, subsCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSourceFlowableRetry1() throws InterruptedException {
  final AtomicInteger subsCount = new AtomicInteger(0);
  final TestSubscriber<String> ts = new TestSubscriber<String>();
  Publisher<String> onSubscribe = new Publisher<String>() {
    @Override
    public void subscribe(Subscriber<? super String> s) {
      s.onSubscribe(new BooleanSubscription());
      subsCount.incrementAndGet();
      s.onError(new RuntimeException("failed"));
    }
  };
  Flowable.unsafeCreate(onSubscribe).retry(1).subscribe(ts);
  assertEquals(2, subsCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test//(timeout = 10000)
public void testTimeoutWithRetry() {
  Subscriber<Long> subscriber = TestHelper.mockSubscriber();
  // Flowable that sends every 100ms (timeout fails instead)
  SlowFlowable sf = new SlowFlowable(100, 10, "testTimeoutWithRetry");
  Flowable<Long> f = Flowable.unsafeCreate(sf).timeout(80, TimeUnit.MILLISECONDS).retry(5);
  AsyncSubscriber<Long> async = new AsyncSubscriber<Long>(subscriber);
  f.subscribe(async);
  async.await();
  InOrder inOrder = inOrder(subscriber);
  // Should fail once
  inOrder.verify(subscriber, times(1)).onError(any(Throwable.class));
  inOrder.verify(subscriber, never()).onComplete();
  assertEquals("Start 6 threads, retry 5 then fail on 6", 6, sf.efforts.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 10000)
public void testUnsubscribeAfterError() {
  Subscriber<Long> subscriber = TestHelper.mockSubscriber();
  // Flowable that always fails after 100ms
  SlowFlowable so = new SlowFlowable(100, 0, "testUnsubscribeAfterError");
  Flowable<Long> f = Flowable.unsafeCreate(so).retry(5);
  AsyncSubscriber<Long> async = new AsyncSubscriber<Long>(subscriber);
  f.subscribe(async);
  async.await();
  InOrder inOrder = inOrder(subscriber);
  // Should fail once
  inOrder.verify(subscriber, times(1)).onError(any(Throwable.class));
  inOrder.verify(subscriber, never()).onComplete();
  assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
  assertEquals("Only 1 active subscription", 1, so.maxActive.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testJustAndRetry() throws Exception {
  final AtomicBoolean throwException = new AtomicBoolean(true);
  int value = Flowable.just(1).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      if (throwException.compareAndSet(true, false)) {
        throw new TestException();
      }
      return t1;
    }
  }).retry(1).blockingSingle();
  assertEquals(1, value);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue2826() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  final RuntimeException e = new RuntimeException("You shall not pass");
  final AtomicInteger c = new AtomicInteger();
  Flowable.just(1).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      c.incrementAndGet();
      throw e;
    }
  }).retry(retry5).subscribe(ts);
  ts.assertTerminated();
  assertEquals(6, c.get());
  assertEquals(Collections.singletonList(e), ts.errors());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dontRetry() {
  Flowable.error(new TestException("Outer"))
  .retry(Functions.alwaysFalse())
  .test()
  .assertFailureAndMessage(TestException.class, "Outer");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void predicateThrows() {
  TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
  .retry(new Predicate<Throwable>() {
    @Override
    public boolean test(Throwable e) throws Exception {
      throw new TestException("Inner");
    }
  })
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void bipredicateThrows() {
  TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
  .retry(new BiPredicate<Integer, Throwable>() {
    @Override
    public boolean test(Integer n, Throwable e) throws Exception {
      throw new TestException("Inner");
    }
  })
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void retryPredicate() {
  Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .retry(new Predicate<Throwable>() {
    @Override
    public boolean test(Throwable v) throws Exception {
      return true;
    }
  })
  .take(5)
  .test()
  .assertResult(1, 1, 1, 1, 1);
}

相关文章

Flowable类方法