io.reactivex.Flowable.onErrorResumeNext()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(216)

本文整理了Java中io.reactivex.Flowable.onErrorResumeNext()方法的一些代码示例,展示了Flowable.onErrorResumeNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.onErrorResumeNext()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:onErrorResumeNext

Flowable.onErrorResumeNext介绍

[英]Instructs a Publisher to pass control to another Publisher rather than invoking Subscriber#onError if it encounters an error.

By default, when a Publisher encounters an error that prevents it from emitting the expected item to its Subscriber, the Publisher invokes its Subscriber's onError method, and then quits without invoking any more of its Subscriber's methods. The onErrorResumeNext method changes this behavior. If you pass a function that returns a Publisher ( resumeFunction) to onErrorResumeNext, if the original Publisher encounters an error, instead of invoking its Subscriber's onError method, it will instead relinquish control to the Publisher returned from resumeFunction, which will invoke the Subscriber's Subscriber#onNext method if it is able to do so. In such a case, because no Publisher necessarily invokes onError, the Subscriber may never know that an error happened.

You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. Backpressure: The operator honors backpressure from downstream. This and the resuming Publishers are expected to honor backpressure as well. If any of them violate this expectation, the operator may throw an IllegalStateException when the source Publisher completes or a MissingBackpressureException is signaled somewhere downstream. Scheduler: onErrorResumeNext does not operate by default on a particular Scheduler.
[中]指示发布服务器在遇到错误时将控制权传递给另一个发布服务器,而不是调用订阅服务器#onError。
默认情况下,当发布服务器遇到错误,无法向其订阅服务器发送预期项目时,发布服务器将调用其订阅服务器的onError方法,然后退出,不再调用任何订阅服务器的方法。下一个方法将更改此行为。如果将返回发布服务器(resumeFunction)的函数传递给onErrorResumeNext,则如果原始发布服务器遇到错误,它不会调用其订阅服务器的onError方法,而是将控制权放弃给从resumeFunction返回的发布服务器,它将调用订阅者的订阅者#onNext方法(如果它能够这样做)。在这种情况下,因为没有发布者必须调用onError,订阅者可能永远不会知道发生了错误。
您可以使用它来防止错误传播,或者在遇到错误时提供回退数据。背压:操作员接受来自下游的背压。预计这家出版社和恢复发行的出版商也将承受背压。如果其中任何一个违反此预期,则当源发布服务器完成或下游某个位置发出MissingBackpressureException信号时,运算符可能抛出非法状态异常。Scheduler:onErrorResumeNext默认情况下不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Object> apply(Flowable<Object> f) throws Exception {
    return f.onErrorResumeNext(f);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void onErrorResumeNextFlowableNull() {
  just1.onErrorResumeNext((Flowable<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void onErrorResumeNextPublisherNull() {
  just1.onErrorResumeNext((Publisher<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void onErrorResumeNextFunctionNull() {
  just1.onErrorResumeNext((Function<Throwable, Publisher<Integer>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Integer> f) throws Exception {
    return Flowable.error(new IOException())
        .onErrorResumeNext(Functions.justFunction(f));
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).onErrorResumeNext(Flowable.<Integer>never())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testResumeNext() {
  Subscription s = mock(Subscription.class);
  // Trigger failure on second element
  TestObservable f = new TestObservable(s, "one", "fail", "two", "three");
  Flowable<String> w = Flowable.unsafeCreate(f);
  Flowable<String> resume = Flowable.just("twoResume", "threeResume");
  Flowable<String> flowable = w.onErrorResumeNext(resume);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  try {
    f.t.join();
  } catch (InterruptedException e) {
    fail(e.getMessage());
  }
  verify(subscriber, Mockito.never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, Mockito.never()).onNext("two");
  verify(subscriber, Mockito.never()).onNext("three");
  verify(subscriber, times(1)).onNext("twoResume");
  verify(subscriber, times(1)).onNext("threeResume");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  Flowable.<String>error(new Exception())
      .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
      .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
          thread.set(Thread.currentThread());
          latch.countDown();
        }
      })
      .onErrorResumeNext(Flowable.<String>empty())
      .subscribe();
  latch.await();
  assertNotEquals(Thread.currentThread(), thread.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Publishers should not throw")
public void testResumeNextWithFailureOnSubscribe() {
  Flowable<String> testObservable = Flowable.unsafeCreate(new Publisher<String>() {
    @Override
    public void subscribe(Subscriber<? super String> t1) {
      throw new RuntimeException("force failure");
    }
  });
  Flowable<String> resume = Flowable.just("resume");
  Flowable<String> flowable = testObservable.onErrorResumeNext(resume);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  verify(subscriber, Mockito.never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, times(1)).onNext("resume");
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Test that when a function throws an exception this is propagated through onError.
 */
@Test
public void testFunctionThrowsError() {
  Subscription s = mock(Subscription.class);
  TestFlowable w = new TestFlowable(s, "one");
  Function<Throwable, Flowable<String>> resume = new Function<Throwable, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Throwable t1) {
      throw new RuntimeException("exception from function");
    }
  };
  Flowable<String> flowable = Flowable.unsafeCreate(w).onErrorResumeNext(resume);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  try {
    w.t.join();
  } catch (InterruptedException e) {
    fail(e.getMessage());
  }
  // we should get the "one" value before the error
  verify(subscriber, times(1)).onNext("one");
  // we should have received an onError call on the Observer since the resume function threw an exception
  verify(subscriber, times(1)).onError(any(Throwable.class));
  verify(subscriber, times(0)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testResumeNextWithAsyncExecution() {
  final AtomicReference<Throwable> receivedException = new AtomicReference<Throwable>();
  Subscription s = mock(Subscription.class);
  TestFlowable w = new TestFlowable(s, "one");
  Function<Throwable, Flowable<String>> resume = new Function<Throwable, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Throwable t1) {
      receivedException.set(t1);
      return Flowable.just("twoResume", "threeResume");
    }
  };
  Flowable<String> flowable = Flowable.unsafeCreate(w).onErrorResumeNext(resume);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  try {
    w.t.join();
  } catch (InterruptedException e) {
    fail(e.getMessage());
  }
  verify(subscriber, Mockito.never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, Mockito.never()).onNext("two");
  verify(subscriber, Mockito.never()).onNext("three");
  verify(subscriber, times(1)).onNext("twoResume");
  verify(subscriber, times(1)).onNext("threeResume");
  assertNotNull(receivedException.get());
}

代码示例来源:origin: ReactiveX/RxJava

Flowable<String> flowable = w.onErrorResumeNext(resume);

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Publishers should not throw")
public void testResumeNextWithFailureOnSubscribeAsync() {
  Flowable<String> testObservable = Flowable.unsafeCreate(new Publisher<String>() {
    @Override
    public void subscribe(Subscriber<? super String> t1) {
      throw new RuntimeException("force failure");
    }
  });
  Flowable<String> resume = Flowable.just("resume");
  Flowable<String> flowable = testObservable.subscribeOn(Schedulers.io()).onErrorResumeNext(resume);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<String> ts = new TestSubscriber<String>(subscriber, Long.MAX_VALUE);
  flowable.subscribe(ts);
  ts.awaitTerminalEvent();
  verify(subscriber, Mockito.never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, times(1)).onNext("resume");
}

代码示例来源:origin: ReactiveX/RxJava

Flowable<String> flowable = w.onErrorResumeNext(new Function<Throwable, Flowable<String>>() {

代码示例来源:origin: ReactiveX/RxJava

Flowable<String> flowable = w.onErrorResumeNext(resume);

代码示例来源:origin: ReactiveX/RxJava

public final Flowable<T> onErrorResumeNext(final Publisher<? extends T> next) {
  ObjectHelper.requireNonNull(next, "next is null");
  return onErrorResumeNext(Functions.justFunction(next));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorResumeNextFunctionReturnsNull() {
  try {
    Flowable.error(new TestException()).onErrorResumeNext(new Function<Throwable, Publisher<Object>>() {
      @Override
      public Publisher<Object> apply(Throwable e) {
        return null;
      }
    }).blockingSubscribe();
    fail("Should have thrown");
  } catch (CompositeException ex) {
    List<Throwable> errors = ex.getExceptions();
    TestHelper.assertError(errors, 0, TestException.class);
    TestHelper.assertError(errors, 1, NullPointerException.class);
    assertEquals(2, errors.size());
  }
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Test that we receive the onError if an exception is thrown from an operator that
 * does not have manual try/catch handling like map does.
 */
@Test
@Ignore("Failed operator may leave the child subscriber in an inconsistent state which prevents further error delivery.")
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
  TestSubscriber<String> ts = new TestSubscriber<String>();
  Flowable.just(1).lift(new FlowableOperator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> apply(Subscriber<? super String> t1) {
      throw new RuntimeException("failed");
    }
  }).onErrorResumeNext(new Function<Throwable, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Throwable t1) {
      if (t1.getMessage().equals("failed")) {
        return Flowable.just("success");
      } else {
        return Flowable.error(t1);
      }
    }
  }).subscribe(ts);
  ts.assertTerminated();
  System.out.println(ts.values());
  ts.assertValue("success");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, 100000)
      .onErrorResumeNext(new Function<Throwable, Flowable<Integer>>() {
        @Override
        public Flowable<Integer> apply(Throwable t1) {
          return Flowable.just(1);
        }
      })
      .observeOn(Schedulers.computation())
      .map(new Function<Integer, Integer>() {
        int c;
        @Override
        public Integer apply(Integer t1) {
          if (c++ <= 1) {
            // slow
            try {
              Thread.sleep(500);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
          return t1;
        }
      })
      .subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, 100000)
      .onErrorResumeNext(Flowable.just(1))
      .observeOn(Schedulers.computation())
      .map(new Function<Integer, Integer>() {
        int c;
        @Override
        public Integer apply(Integer t1) {
          if (c++ <= 1) {
            // slow
            try {
              Thread.sleep(500);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
          return t1;
        }
      })
      .subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
}

相关文章

Flowable类方法