io.reactivex.Flowable.retryWhen()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(201)

本文整理了Java中io.reactivex.Flowable.retryWhen()方法的一些代码示例,展示了Flowable.retryWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.retryWhen()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:retryWhen

Flowable.retryWhen介绍

[英]Returns a Flowable that emits the same values as the source Publisher with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the Publisher provided as an argument to the notificationHandlerfunction. If that Publisher calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this Publisher will resubscribe to the source Publisher.

Example: This retries 3 times, each time incrementing the number of seconds it waits.

Flowable.create((FlowableEmitter<? super String> s) -> { 
System.out.println("subscribing"); 
s.onError(new RuntimeException("always fails")); 
}, BackpressureStrategy.BUFFER).retryWhen(attempts -> { 
return attempts.zipWith(Flowable.range(1, 3), (n, i) -> i).flatMap(i -> { 
System.out.println("delay retry by " + i + " second(s)"); 
return Flowable.timer(i, TimeUnit.SECONDS); 
}); 
}).blockingForEach(System.out::println);

Output is:

subscribing

Note that the inner Publisher returned by the handler function should signal either onNext, onError or onComplete in response to the received Throwable to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signaling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner Publisher signals onError or onComplete while the upstream is active, the sequence is terminated with the same signal immediately.

The following example demonstrates how to retry an asynchronous source with a delay:

Flowable.timer(1, TimeUnit.SECONDS) 
.doOnSubscribe(s -> System.out.println("subscribing")) 
.map(v -> { throw new RuntimeException(); }) 
.retryWhen(errors -> { 
AtomicInteger counter = new AtomicInteger(); 
return errors 
.takeWhile(e -> counter.getAndIncrement() != 3) 
.flatMap(e -> { 
System.out.println("delay retry by " + counter.get() + " second(s)"); 
return Flowable.timer(counter.get(), TimeUnit.SECONDS); 
}); 
}) 
.blockingSubscribe(System.out::println, System.out::println);

Backpressure: The operator honors downstream backpressure and expects both the source and inner Publishers to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: retryWhen does not operate by default on a particular Scheduler.
[中]返回与源发布服务器发出相同值的Flowable,但onError除外。源发出的onError通知将导致向发布者发送一个可丢弃的项目,该项目作为NotificationHandler函数的参数提供。如果发布服务器调用onComplete或onError,则retry将对子订阅调用onComplete或onError。否则,此发布服务器将重新订阅源发布服务器。
示例:重试3次,每次增加等待的秒数。

Flowable.create((FlowableEmitter<? super String> s) -> { 
System.out.println("subscribing"); 
s.onError(new RuntimeException("always fails")); 
}, BackpressureStrategy.BUFFER).retryWhen(attempts -> { 
return attempts.zipWith(Flowable.range(1, 3), (n, i) -> i).flatMap(i -> { 
System.out.println("delay retry by " + i + " second(s)"); 
return Flowable.timer(i, TimeUnit.SECONDS); 
}); 
}).blockingForEach(System.out::println);

输出为:

subscribing

请注意,处理程序函数返回的内部发布程序应发出onNext、onError或onComplete信号,以响应接收到的Throwable,指示操作员应重试或终止。如果操作员的上游是异步的,则发出onNext后紧接着onComplete的信号可能会导致序列立即完成。类似地,如果此内部发布服务器在上游处于活动状态时发出onError或onComplete信号,则序列将立即以相同的信号终止。
以下示例演示如何延迟重试异步源:

Flowable.timer(1, TimeUnit.SECONDS) 
.doOnSubscribe(s -> System.out.println("subscribing")) 
.map(v -> { throw new RuntimeException(); }) 
.retryWhen(errors -> { 
AtomicInteger counter = new AtomicInteger(); 
return errors 
.takeWhile(e -> counter.getAndIncrement() != 3) 
.flatMap(e -> { 
System.out.println("delay retry by " + counter.get() + " second(s)"); 
return Flowable.timer(counter.get(), TimeUnit.SECONDS); 
}); 
}) 
.blockingSubscribe(System.out::println, System.out::println);

背压:运营商尊重下游背压,并希望源发布商和内部发布商也尊重背压。如果违反此期望,运算符可能抛出一个非法状态异常。Scheduler:retryWhen默认情况下不在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryWhenFunctionNull() {
  just1.retryWhen(null);
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
  return toSingle(toFlowable().retryWhen(handler));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnErrorFromNotificationHandler() {
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(2));
  origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
    @Override
    public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
      return Flowable.error(new RuntimeException());
    }
  }).subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber).onSubscribe((Subscription)notNull());
  inOrder.verify(subscriber, never()).onNext("beginningEveryTime");
  inOrder.verify(subscriber, never()).onNext("onSuccessOnly");
  inOrder.verify(subscriber, never()).onComplete();
  inOrder.verify(subscriber, times(1)).onError(any(RuntimeException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
  return fromPublisher(toFlowable().retryWhen(handler));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnCompletedFromNotificationHandler() {
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(1));
  TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
  origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
    @Override
    public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
      return Flowable.empty();
    }
  }).subscribe(ts);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber).onSubscribe((Subscription)notNull());
  inOrder.verify(subscriber, never()).onNext("beginningEveryTime");
  inOrder.verify(subscriber, never()).onNext("onSuccessOnly");
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verify(subscriber, never()).onError(any(Exception.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: kaushikgopal/RxJava-Android-Samples

@OnClick(R.id.btn_eb_retry)
public void startRetryingWithExponentialBackoffStrategy() {
 _logs = new ArrayList<>();
 _adapter.clear();
 DisposableSubscriber<Object> disposableSubscriber =
   new DisposableSubscriber<Object>() {
    @Override
    public void onNext(Object aVoid) {
     Timber.d("on Next");
    }
    @Override
    public void onComplete() {
     Timber.d("on Completed");
    }
    @Override
    public void onError(Throwable e) {
     _log("Error: I give up!");
    }
   };
 Flowable.error(new RuntimeException("testing")) // always fails
   .retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext
   // values sent are ignored)
   .doOnSubscribe(subscription -> _log("Attempting the impossible 5 times in intervals of 1s"))
   .subscribe(disposableSubscriber);
 _disposables.add(disposableSubscriber);
}

代码示例来源:origin: ReactiveX/RxJava

public final Maybe<T> retryWhen(
    final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
  return toFlowable().retryWhen(handler).singleElement();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnNextFromNotificationHandler() {
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  int numRetries = 2;
  Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
  origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
    @Override
    public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
      return t1.map(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable t1) {
          return 0;
        }
      }).startWith(0).cast(Object.class);
    }
  }).subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  // should show 3 attempts
  inOrder.verify(subscriber, times(numRetries + 1)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(subscriber, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(subscriber, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryWhenFunctionReturnsNull() {
  Flowable.error(new TestException()).retryWhen(new Function<Flowable<? extends Throwable>, Publisher<Object>>() {
    @Override
    public Publisher<Object> apply(Flowable<? extends Throwable> f) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: redisson/redisson

@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
  return fromPublisher(toFlowable().retryWhen(handler));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleSubscriptionOnFirst() throws Exception {
  final AtomicInteger inc = new AtomicInteger(0);
  Publisher<Integer> onSubscribe = new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
      subscriber.onSubscribe(new BooleanSubscription());
      final int emit = inc.incrementAndGet();
      subscriber.onNext(emit);
      subscriber.onComplete();
    }
  };
  int first = Flowable.unsafeCreate(onSubscribe)
      .retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
        @Override
        public Flowable<Object> apply(Flowable<? extends Throwable> attempt) {
          return attempt.zipWith(Flowable.just(1), new BiFunction<Throwable, Integer, Object>() {
            @Override
            public Object apply(Throwable o, Integer integer) {
              return 0;
            }
          });
        }
      })
      .blockingFirst();
  assertEquals("Observer did not receive the expected output", 1, first);
  assertEquals("Subscribe was not called once", 1, inc.get());
}

代码示例来源:origin: redisson/redisson

@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
  return toSingle(toFlowable().retryWhen(handler));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeatWhen2() {
  final AtomicInteger counter = new AtomicInteger();
  final AtomicInteger times = new AtomicInteger();
  Flowable<Integer> source = Flowable.<Integer>error(new TestException())
      .doOnCancel(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.retryWhen(new Function<Flowable<Throwable>, Flowable<?>>() {
    @Override
    public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
      return e.takeWhile(new Predicate<Object>() {
        @Override
        public boolean test(Object v) throws Exception {
          return times.getAndIncrement() < 4;
        }
      });
    }
  })
  .test()
  .assertResult();
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

producer.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

source.retryWhen(new Function<Flowable<Throwable>, Flowable<?>>() {
  @Override
  public Flowable<?> apply(Flowable<Throwable> e) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shouldDisposeInnerFlowable() {
 final PublishProcessor<Object> processor = PublishProcessor.create();
 final Disposable disposable = Flowable.error(new RuntimeException("Leak"))
   .retryWhen(new Function<Flowable<Throwable>, Flowable<Object>>() {
    @Override
    public Flowable<Object> apply(Flowable<Throwable> errors) throws Exception {
      return errors.switchMap(new Function<Throwable, Flowable<Object>>() {
        @Override
        public Flowable<Object> apply(Throwable ignore) throws Exception {
          return processor;
        }
      });
    }
  })
   .subscribe();
 assertTrue(processor.hasSubscribers());
 disposable.dispose();
 assertFalse(processor.hasSubscribers());
}

代码示例来源:origin: ReactiveX/RxJava

Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
  @Override
  public Flowable<Object> apply(Flowable<? extends Throwable> t1) {

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void retryWhenDefaultScheduler() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(1)
  .concatWith(Flowable.<Integer>error(new TestException()))
  .retryWhen((Function)new Function<Flowable, Flowable>() {
    @Override
    public Flowable apply(Flowable f) {
      return f.take(2);
    }
  }).subscribe(ts);
  ts.assertValues(1, 1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

.retryWhen(new Function<Flowable<Throwable>, Flowable<Integer>>() {
  @Override
  public Flowable<Integer> apply(Flowable<Throwable> v)

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void retryWhenTrampolineScheduler() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(1)
  .concatWith(Flowable.<Integer>error(new TestException()))
  .subscribeOn(Schedulers.trampoline())
  .retryWhen((Function)new Function<Flowable, Flowable>() {
    @Override
    public Flowable apply(Flowable f) {
      return f.take(2);
    }
  }).subscribe(ts);
  ts.assertValues(1, 1);
  ts.assertNoErrors();
  ts.assertComplete();
}

相关文章

Flowable类方法