io.reactivex.Flowable.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(297)

本文整理了Java中io.reactivex.Flowable.doOnError()方法的一些代码示例,展示了Flowable.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnError

Flowable.doOnError介绍

[英]Modifies the source Publisher so that it invokes an action if it calls onError.

In case the onError action throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onError时调用操作。
如果OneError操作抛出,下游将收到一个包含原始异常和OneError抛出的异常的复合异常。
背压:操作员不会干扰由源发布者的背压行为确定的背压。计划程序:默认情况下,doOnError不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnErrorNull() {
  3. just1.doOnError(null);
  4. }

代码示例来源:origin: skylot/jadx

  1. private void searchFieldSubscribe() {
  2. searchEmitter = new SearchEventEmitter();
  3. Flowable<String> textChanges = onTextFieldChanges(searchField);
  4. Flowable<String> searchEvents = Flowable.merge(textChanges, searchEmitter.getFlowable());
  5. searchDisposable = searchEvents
  6. .filter(text -> text.length() > 0)
  7. .subscribeOn(Schedulers.single())
  8. .doOnNext(r -> LOG.debug("search event: {}", r))
  9. .switchMap(text -> prepareSearch(text)
  10. .doOnError(e -> LOG.error("Error prepare search: {}", e.getMessage(), e))
  11. .subscribeOn(Schedulers.single())
  12. .toList()
  13. .toFlowable(), 1)
  14. .observeOn(SwingSchedulers.edt())
  15. .doOnError(e -> LOG.error("Error while searching: {}", e.getMessage(), e))
  16. .subscribe(this::processSearchResults);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMapWithError() {
  3. final List<Throwable> errors = new ArrayList<Throwable>();
  4. Flowable<String> w = Flowable.just("one", "fail", "two", "three", "fail");
  5. Flowable<String> m = w.map(new Function<String, String>() {
  6. @Override
  7. public String apply(String s) {
  8. if ("fail".equals(s)) {
  9. throw new TestException("Forced Failure");
  10. }
  11. return s;
  12. }
  13. }).doOnError(new Consumer<Throwable>() {
  14. @Override
  15. public void accept(Throwable t1) {
  16. errors.add(t1);
  17. }
  18. });
  19. m.subscribe(stringSubscriber);
  20. verify(stringSubscriber, times(1)).onNext("one");
  21. verify(stringSubscriber, never()).onNext("two");
  22. verify(stringSubscriber, never()).onNext("three");
  23. verify(stringSubscriber, never()).onComplete();
  24. verify(stringSubscriber, times(1)).onError(any(TestException.class));
  25. TestHelper.assertError(errors, 0, TestException.class, "Forced Failure");
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
  3. Flowable<String> src = Flowable.just("a", "b", "null", "c");
  4. final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  5. src
  6. .doOnError(new Consumer<Throwable>() {
  7. @Override
  8. public void accept(Throwable t) {
  9. errorOccurred.set(true);
  10. }
  11. })
  12. .distinctUntilChanged(THROWS_NON_FATAL)
  13. .subscribe(w);
  14. Assert.assertFalse(errorOccurred.get());
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testOnErrorCalledOnScheduler() throws Exception {
  3. final CountDownLatch latch = new CountDownLatch(1);
  4. final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  5. Flowable.<String>error(new Exception())
  6. .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override
  9. public void accept(Throwable throwable) throws Exception {
  10. thread.set(Thread.currentThread());
  11. latch.countDown();
  12. }
  13. })
  14. .onErrorResumeNext(Flowable.<String>empty())
  15. .subscribe();
  16. latch.await();
  17. assertNotEquals(Thread.currentThread(), thread.get());
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNonFatalExceptionThrownByCombinatorForSingleSourceIsNotReportedByUpstreamOperator() {
  3. final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  4. TestSubscriber<Integer> ts = TestSubscriber.create(1);
  5. Flowable<Integer> source = Flowable.just(1)
  6. // if haven't caught exception in combineLatest operator then would incorrectly
  7. // be picked up by this call to doOnError
  8. .doOnError(new Consumer<Throwable>() {
  9. @Override
  10. public void accept(Throwable t) {
  11. errorOccurred.set(true);
  12. }
  13. });
  14. Flowable
  15. .combineLatest(Collections.singletonList(source), THROW_NON_FATAL)
  16. .subscribe(ts);
  17. assertFalse(errorOccurred.get());
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnError() {
  3. final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
  4. Throwable t = null;
  5. try {
  6. Flowable.<String> error(new RuntimeException("an error"))
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override
  9. public void accept(Throwable v) {
  10. r.set(v);
  11. }
  12. }).blockingSingle();
  13. fail("expected exception, not a return value");
  14. } catch (Throwable e) {
  15. t = e;
  16. }
  17. assertNotNull(t);
  18. assertEquals(t, r.get());
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() {
  3. final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  4. //request 0
  5. TestSubscriber<Long> ts = TestSubscriber.create(0);
  6. //range method emits regardless of requests so should trigger onBackpressureDrop action
  7. range(2)
  8. // if haven't caught exception in onBackpressureDrop operator then would incorrectly
  9. // be picked up by this call to doOnError
  10. .doOnError(new Consumer<Throwable>() {
  11. @Override
  12. public void accept(Throwable t) {
  13. errorOccurred.set(true);
  14. }
  15. })
  16. .onBackpressureDrop(THROW_NON_FATAL)
  17. .subscribe(ts);
  18. assertFalse(errorOccurred.get());
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnCancel(sourceUnsubscribed)
  2. .doOnComplete(sourceCompleted)
  3. .doOnError(sourceError)
  4. .subscribeOn(mockScheduler).replay();

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {
  3. final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  4. TestSubscriber<Long> ts = TestSubscriber.create(0);
  5. infinite
  6. .subscribeOn(Schedulers.computation())
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override
  9. public void accept(Throwable t) {
  10. errorOccurred.set(true);
  11. }
  12. })
  13. .onBackpressureBuffer(1, THROWS_NON_FATAL)
  14. .subscribe(ts);
  15. ts.awaitTerminalEvent();
  16. assertFalse(errorOccurred.get());
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doOnNextDoOnErrorFused2() {
  3. ConnectableFlowable<Integer> cf = Flowable.just(1)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. throw new TestException("First");
  8. }
  9. })
  10. .doOnError(new Consumer<Throwable>() {
  11. @Override
  12. public void accept(Throwable e) throws Exception {
  13. throw new TestException("Second");
  14. }
  15. })
  16. .doOnError(new Consumer<Throwable>() {
  17. @Override
  18. public void accept(Throwable e) throws Exception {
  19. throw new TestException("Third");
  20. }
  21. })
  22. .publish();
  23. TestSubscriber<Integer> ts = cf.test();
  24. cf.connect();
  25. ts.assertFailure(CompositeException.class);
  26. TestHelper.assertError(ts, 0, TestException.class, "First");
  27. TestHelper.assertError(ts, 1, TestException.class, "Second");
  28. TestHelper.assertError(ts, 2, TestException.class, "Third");
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doOnNextDoOnErrorFusedConditional2() {
  3. ConnectableFlowable<Integer> cf = Flowable.just(1)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. throw new TestException("First");
  8. }
  9. })
  10. .doOnError(new Consumer<Throwable>() {
  11. @Override
  12. public void accept(Throwable e) throws Exception {
  13. throw new TestException("Second");
  14. }
  15. })
  16. .doOnError(new Consumer<Throwable>() {
  17. @Override
  18. public void accept(Throwable e) throws Exception {
  19. throw new TestException("Third");
  20. }
  21. })
  22. .filter(Functions.alwaysTrue())
  23. .publish();
  24. TestSubscriber<Integer> ts = cf.test();
  25. cf.connect();
  26. ts.assertFailure(CompositeException.class);
  27. TestHelper.assertError(ts, 0, TestException.class, "First");
  28. TestHelper.assertError(ts, 1, TestException.class, "Second");
  29. TestHelper.assertError(ts, 2, TestException.class, "Third");
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnError(new Consumer<Throwable>() {
  2. @Override
  3. public void accept(Throwable e) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorThrows() {
  3. TestSubscriber<Object> ts = TestSubscriber.create();
  4. Flowable.error(new TestException())
  5. .doOnError(new Consumer<Throwable>() {
  6. @Override
  7. public void accept(Throwable e) {
  8. throw new TestException();
  9. }
  10. }).subscribe(ts);
  11. ts.assertNoValues();
  12. ts.assertNotComplete();
  13. ts.assertError(CompositeException.class);
  14. CompositeException ex = (CompositeException)ts.errors().get(0);
  15. List<Throwable> exceptions = ex.getExceptions();
  16. assertEquals(2, exceptions.size());
  17. Assert.assertTrue(exceptions.get(0) instanceof TestException);
  18. Assert.assertTrue(exceptions.get(1) instanceof TestException);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDisposesEagerlyBeforeError() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Consumer<Throwable> onError = createOnErrorAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "))
  11. .concatWith(Flowable.<String>error(new RuntimeException()));
  12. }
  13. };
  14. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  15. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  16. new DisposeAction(), true)
  17. .doOnCancel(unsub)
  18. .doOnError(onError);
  19. flowable.safeSubscribe(subscriber);
  20. assertEquals(Arrays.asList("disposed", "error"), events);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDoesNotDisposesEagerlyBeforeError() {
  3. final List<String> events = new ArrayList<String>();
  4. final Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Consumer<Throwable> onError = createOnErrorAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "))
  11. .concatWith(Flowable.<String>error(new RuntimeException()));
  12. }
  13. };
  14. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  15. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  16. new DisposeAction(), false)
  17. .doOnCancel(unsub)
  18. .doOnError(onError);
  19. flowable.safeSubscribe(subscriber);
  20. assertEquals(Arrays.asList("error", "disposed"), events);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorOnErrorCrashConditional() {
  3. TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
  4. .doOnError(new Consumer<Throwable>() {
  5. @Override
  6. public void accept(Throwable e) throws Exception {
  7. throw new TestException("Inner");
  8. }
  9. })
  10. .filter(Functions.alwaysTrue())
  11. .test()
  12. .assertFailure(CompositeException.class);
  13. List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  14. TestHelper.assertError(errors, 0, TestException.class, "Outer");
  15. TestHelper.assertError(errors, 1, TestException.class, "Inner");
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doOnNextDoOnErrorFused() {
  3. ConnectableFlowable<Integer> cf = Flowable.just(1)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. throw new TestException("First");
  8. }
  9. })
  10. .doOnError(new Consumer<Throwable>() {
  11. @Override
  12. public void accept(Throwable e) throws Exception {
  13. throw new TestException("Second");
  14. }
  15. })
  16. .publish();
  17. TestSubscriber<Integer> ts = cf.test();
  18. cf.connect();
  19. ts.assertFailure(CompositeException.class);
  20. TestHelper.assertError(ts, 0, TestException.class, "First");
  21. TestHelper.assertError(ts, 1, TestException.class, "Second");
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnError(new Consumer<Throwable>() {
  2. @Override
  3. public void accept(Throwable t1) {
  4. Thread.sleep(100);
  5. interval
  6. .doOnError(new Consumer<Throwable>() {
  7. @Override
  8. public void accept(Throwable t1) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doOnNextDoOnErrorFusedConditional() {
  3. ConnectableFlowable<Integer> cf = Flowable.just(1)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer v) throws Exception {
  7. throw new TestException("First");
  8. }
  9. })
  10. .doOnError(new Consumer<Throwable>() {
  11. @Override
  12. public void accept(Throwable e) throws Exception {
  13. throw new TestException("Second");
  14. }
  15. })
  16. .filter(Functions.alwaysTrue())
  17. .publish();
  18. TestSubscriber<Integer> ts = cf.test();
  19. cf.connect();
  20. ts.assertFailure(CompositeException.class);
  21. TestHelper.assertError(ts, 0, TestException.class, "First");
  22. TestHelper.assertError(ts, 1, TestException.class, "Second");
  23. }

相关文章

Flowable类方法