io.reactivex.Observable.onErrorResumeNext()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(289)

本文整理了Java中io.reactivex.Observable.onErrorResumeNext()方法的一些代码示例,展示了Observable.onErrorResumeNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onErrorResumeNext()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:onErrorResumeNext

Observable.onErrorResumeNext介绍

[英]Instructs an ObservableSource to pass control to another ObservableSource rather than invoking Observer#onError if it encounters an error.

By default, when an ObservableSource encounters an error that prevents it from emitting the expected item to its Observer, the ObservableSource invokes its Observer's onError method, and then quits without invoking any more of its Observer's methods. The onErrorResumeNext method changes this behavior. If you pass another ObservableSource ( resumeSequence) to an ObservableSource's onErrorResumeNext method, if the original ObservableSource encounters an error, instead of invoking its Observer's onError method, it will instead relinquish control to resumeSequence which will invoke the Observer's Observer#onNext method if it is able to do so. In such a case, because no ObservableSource necessarily invokes onError, the Observer may never know that an error happened.

You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. Scheduler: onErrorResumeNext does not operate by default on a particular Scheduler.
[中]指示一个ObservateSource将控制权传递给另一个ObservateSource,而不是在遇到错误时调用Observator#onError。
默认情况下,当ObservateSource遇到一个错误,阻止它向其观察者发送预期的项时,ObservateSource调用其观察者的OneError方法,然后退出,不再调用任何观察者的方法。下一个方法会更改此行为。如果您将另一个ObservateSource(resumeSequence)传递给ObservateSource的OneErrorResumeNext方法,如果原始ObservateSource遇到错误,它将放弃对resumeSequence的控制,resumeSequence将调用观察者的Observator#onNext方法(如果它能够这样做)。在这种情况下,由于ObservateSource不一定调用onError,因此观察者可能永远不会知道发生了错误。
您可以使用它来防止错误传播,或者在遇到错误时提供回退数据。调度器:OneRorResumeNext默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void onErrorResumeNextFunctionNull() {
  3. just1.onErrorResumeNext((Function<Throwable, Observable<Integer>>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void onErrorResumeNextObservableNull() {
  3. just1.onErrorResumeNext((Observable<Integer>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Observable<Integer> o) throws Exception {
  3. return Observable.error(new IOException())
  4. .onErrorResumeNext(Functions.justFunction(o));
  5. }
  6. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Publishers should not throw")
  3. public void testResumeNextWithFailureOnSubscribe() {
  4. Observable<String> testObservable = Observable.unsafeCreate(new ObservableSource<String>() {
  5. @Override
  6. public void subscribe(Observer<? super String> t1) {
  7. throw new RuntimeException("force failure");
  8. }
  9. });
  10. Observable<String> resume = Observable.just("resume");
  11. Observable<String> observable = testObservable.onErrorResumeNext(resume);
  12. Observer<String> observer = TestHelper.mockObserver();
  13. observable.subscribe(observer);
  14. verify(observer, Mockito.never()).onError(any(Throwable.class));
  15. verify(observer, times(1)).onComplete();
  16. verify(observer, times(1)).onNext("resume");
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. public final Observable<T> onErrorResumeNext(final ObservableSource<? extends T> next) {
  2. ObjectHelper.requireNonNull(next, "next is null");
  3. return onErrorResumeNext(Functions.justFunction(next));

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testResumeNext() {
  3. Disposable upstream = mock(Disposable.class);
  4. // Trigger failure on second element
  5. TestObservable f = new TestObservable(upstream, "one", "fail", "two", "three");
  6. Observable<String> w = Observable.unsafeCreate(f);
  7. Observable<String> resume = Observable.just("twoResume", "threeResume");
  8. Observable<String> observable = w.onErrorResumeNext(resume);
  9. Observer<String> observer = TestHelper.mockObserver();
  10. observable.subscribe(observer);
  11. try {
  12. f.t.join();
  13. } catch (InterruptedException e) {
  14. fail(e.getMessage());
  15. }
  16. verify(observer, Mockito.never()).onError(any(Throwable.class));
  17. verify(observer, times(1)).onComplete();
  18. verify(observer, times(1)).onNext("one");
  19. verify(observer, Mockito.never()).onNext("two");
  20. verify(observer, Mockito.never()).onNext("three");
  21. verify(observer, times(1)).onNext("twoResume");
  22. verify(observer, times(1)).onNext("threeResume");
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Publishers should not throw")
  3. public void testResumeNextWithFailureOnSubscribeAsync() {
  4. Observable<String> testObservable = Observable.unsafeCreate(new ObservableSource<String>() {
  5. @Override
  6. public void subscribe(Observer<? super String> t1) {
  7. throw new RuntimeException("force failure");
  8. }
  9. });
  10. Observable<String> resume = Observable.just("resume");
  11. Observable<String> observable = testObservable.subscribeOn(Schedulers.io()).onErrorResumeNext(resume);
  12. Observer<String> observer = TestHelper.mockObserver();
  13. TestObserver<String> to = new TestObserver<String>(observer);
  14. observable.subscribe(to);
  15. to.awaitTerminalEvent();
  16. verify(observer, Mockito.never()).onError(any(Throwable.class));
  17. verify(observer, times(1)).onComplete();
  18. verify(observer, times(1)).onNext("resume");
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testOnErrorCalledOnScheduler() throws Exception {
  3. final CountDownLatch latch = new CountDownLatch(1);
  4. final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  5. Observable.<String>error(new Exception())
  6. .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override
  9. public void accept(Throwable throwable) throws Exception {
  10. thread.set(Thread.currentThread());
  11. latch.countDown();
  12. }
  13. })
  14. .onErrorResumeNext(Observable.<String>empty())
  15. .subscribe();
  16. latch.await();
  17. assertNotEquals(Thread.currentThread(), thread.get());
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Test that when a function throws an exception this is propagated through onError.
  3. */
  4. @Test
  5. public void testFunctionThrowsError() {
  6. Subscription s = mock(Subscription.class);
  7. TestObservable w = new TestObservable(s, "one");
  8. Function<Throwable, Observable<String>> resume = new Function<Throwable, Observable<String>>() {
  9. @Override
  10. public Observable<String> apply(Throwable t1) {
  11. throw new RuntimeException("exception from function");
  12. }
  13. };
  14. Observable<String> o = Observable.unsafeCreate(w).onErrorResumeNext(resume);
  15. Observer<String> observer = TestHelper.mockObserver();
  16. o.subscribe(observer);
  17. try {
  18. w.t.join();
  19. } catch (InterruptedException e) {
  20. fail(e.getMessage());
  21. }
  22. // we should get the "one" value before the error
  23. verify(observer, times(1)).onNext("one");
  24. // we should have received an onError call on the Observer since the resume function threw an exception
  25. verify(observer, times(1)).onError(any(Throwable.class));
  26. verify(observer, times(0)).onComplete();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testResumeNextWithAsyncExecution() {
  3. final AtomicReference<Throwable> receivedException = new AtomicReference<Throwable>();
  4. Subscription s = mock(Subscription.class);
  5. TestObservable w = new TestObservable(s, "one");
  6. Function<Throwable, Observable<String>> resume = new Function<Throwable, Observable<String>>() {
  7. @Override
  8. public Observable<String> apply(Throwable t1) {
  9. receivedException.set(t1);
  10. return Observable.just("twoResume", "threeResume");
  11. }
  12. };
  13. Observable<String> o = Observable.unsafeCreate(w).onErrorResumeNext(resume);
  14. Observer<String> observer = TestHelper.mockObserver();
  15. o.subscribe(observer);
  16. try {
  17. w.t.join();
  18. } catch (InterruptedException e) {
  19. fail(e.getMessage());
  20. }
  21. verify(observer, Mockito.never()).onError(any(Throwable.class));
  22. verify(observer, times(1)).onComplete();
  23. verify(observer, times(1)).onNext("one");
  24. verify(observer, Mockito.never()).onNext("two");
  25. verify(observer, Mockito.never()).onNext("three");
  26. verify(observer, times(1)).onNext("twoResume");
  27. verify(observer, times(1)).onNext("threeResume");
  28. assertNotNull(receivedException.get());
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. Observable<String> observable = w.onErrorResumeNext(resume);

代码示例来源:origin: ReactiveX/RxJava

  1. Observable<String> observable = w.onErrorResumeNext(resume);

代码示例来源:origin: ReactiveX/RxJava

  1. Observable<String> o = w.onErrorResumeNext(new Function<Throwable, Observable<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(0, 100000)
  5. .onErrorResumeNext(new Function<Throwable, Observable<Integer>>() {
  6. @Override
  7. public Observable<Integer> apply(Throwable t1) {
  8. return Observable.just(1);
  9. }
  10. })
  11. .observeOn(Schedulers.computation())
  12. .map(new Function<Integer, Integer>() {
  13. int c;
  14. @Override
  15. public Integer apply(Integer t1) {
  16. if (c++ <= 1) {
  17. // slow
  18. try {
  19. Thread.sleep(500);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. return t1;
  25. }
  26. })
  27. .subscribe(to);
  28. to.awaitTerminalEvent();
  29. to.assertNoErrors();
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(0, 100000)
  5. .onErrorResumeNext(Observable.just(1))
  6. .observeOn(Schedulers.computation())
  7. .map(new Function<Integer, Integer>() {
  8. int c;
  9. @Override
  10. public Integer apply(Integer t1) {
  11. if (c++ <= 1) {
  12. // slow
  13. try {
  14. Thread.sleep(500);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. return t1;
  20. }
  21. })
  22. .subscribe(to);
  23. to.awaitTerminalEvent();
  24. to.assertNoErrors();
  25. }
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void onErrorResumeNextFunctionReturnsNull() {
  3. Observable.error(new TestException()).onErrorResumeNext(new Function<Throwable, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Throwable e) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: redisson/redisson

  1. public final Observable<T> onErrorResumeNext(final ObservableSource<? extends T> next) {
  2. ObjectHelper.requireNonNull(next, "next is null");
  3. return onErrorResumeNext(Functions.justFunction(next));

代码示例来源:origin: ReactiveX/RxJava

  1. }).onErrorResumeNext(new Function<Throwable, Observable<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Test that we receive the onError if an exception is thrown from an operator that
  3. * does not have manual try/catch handling like map does.
  4. */
  5. @Test
  6. @Ignore("Failed operator may leave the child Observer in an inconsistent state which prevents further error delivery.")
  7. public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
  8. TestObserver<String> to = new TestObserver<String>();
  9. Observable.just(1).lift(new ObservableOperator<String, Integer>() {
  10. @Override
  11. public Observer<? super Integer> apply(Observer<? super String> t1) {
  12. throw new RuntimeException("failed");
  13. }
  14. }).onErrorResumeNext(new Function<Throwable, Observable<String>>() {
  15. @Override
  16. public Observable<String> apply(Throwable t1) {
  17. if (t1.getMessage().equals("failed")) {
  18. return Observable.just("success");
  19. } else {
  20. return Observable.error(t1);
  21. }
  22. }
  23. }).subscribe(to);
  24. to.assertTerminated();
  25. System.out.println(to.values());
  26. to.assertValue("success");
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. .onErrorResumeNext(new Function<Throwable, Observable<String>>() {
  2. @Override
  3. public Observable<String> apply(Throwable t1) {

相关文章

Observable类方法