io.reactivex.Flowable.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(270)

本文整理了Java中io.reactivex.Flowable.doOnError()方法的一些代码示例,展示了Flowable.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnError

Flowable.doOnError介绍

[英]Modifies the source Publisher so that it invokes an action if it calls onError.

In case the onError action throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onError时调用操作。
如果OneError操作抛出,下游将收到一个包含原始异常和OneError抛出的异常的复合异常。
背压:操作员不会干扰由源发布者的背压行为确定的背压。计划程序:默认情况下,doOnError不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnErrorNull() {
  just1.doOnError(null);
}

代码示例来源:origin: skylot/jadx

private void searchFieldSubscribe() {
  searchEmitter = new SearchEventEmitter();
  Flowable<String> textChanges = onTextFieldChanges(searchField);
  Flowable<String> searchEvents = Flowable.merge(textChanges, searchEmitter.getFlowable());
  searchDisposable = searchEvents
      .filter(text -> text.length() > 0)
      .subscribeOn(Schedulers.single())
      .doOnNext(r -> LOG.debug("search event: {}", r))
      .switchMap(text -> prepareSearch(text)
          .doOnError(e -> LOG.error("Error prepare search: {}", e.getMessage(), e))
          .subscribeOn(Schedulers.single())
          .toList()
          .toFlowable(), 1)
      .observeOn(SwingSchedulers.edt())
      .doOnError(e -> LOG.error("Error while searching: {}", e.getMessage(), e))
      .subscribe(this::processSearchResults);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMapWithError() {
  final List<Throwable> errors = new ArrayList<Throwable>();
  Flowable<String> w = Flowable.just("one", "fail", "two", "three", "fail");
  Flowable<String> m = w.map(new Function<String, String>() {
    @Override
    public String apply(String s) {
      if ("fail".equals(s)) {
        throw new TestException("Forced Failure");
      }
      return s;
    }
  }).doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable t1) {
      errors.add(t1);
    }
  });
  m.subscribe(stringSubscriber);
  verify(stringSubscriber, times(1)).onNext("one");
  verify(stringSubscriber, never()).onNext("two");
  verify(stringSubscriber, never()).onNext("three");
  verify(stringSubscriber, never()).onComplete();
  verify(stringSubscriber, times(1)).onError(any(TestException.class));
  TestHelper.assertError(errors, 0, TestException.class, "Forced Failure");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
  Flowable<String> src = Flowable.just("a", "b", "null", "c");
  final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  src
   .doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable t) {
        errorOccurred.set(true);
      }
    })
   .distinctUntilChanged(THROWS_NON_FATAL)
   .subscribe(w);
  Assert.assertFalse(errorOccurred.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  Flowable.<String>error(new Exception())
      .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
      .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
          thread.set(Thread.currentThread());
          latch.countDown();
        }
      })
      .onErrorResumeNext(Flowable.<String>empty())
      .subscribe();
  latch.await();
  assertNotEquals(Thread.currentThread(), thread.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNonFatalExceptionThrownByCombinatorForSingleSourceIsNotReportedByUpstreamOperator() {
  final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  TestSubscriber<Integer> ts = TestSubscriber.create(1);
  Flowable<Integer> source = Flowable.just(1)
   // if haven't caught exception in combineLatest operator then would incorrectly
   // be picked up by this call to doOnError
   .doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable t) {
        errorOccurred.set(true);
      }
    });
  Flowable
   .combineLatest(Collections.singletonList(source), THROW_NON_FATAL)
   .subscribe(ts);
  assertFalse(errorOccurred.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnError() {
  final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
  Throwable t = null;
  try {
    Flowable.<String> error(new RuntimeException("an error"))
    .doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable v) {
        r.set(v);
      }
    }).blockingSingle();
    fail("expected exception, not a return value");
  } catch (Throwable e) {
    t = e;
  }
  assertNotNull(t);
  assertEquals(t, r.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() {
  final AtomicBoolean errorOccurred = new AtomicBoolean(false);
  //request 0
  TestSubscriber<Long> ts = TestSubscriber.create(0);
  //range method emits regardless of requests so should trigger onBackpressureDrop action
  range(2)
   // if haven't caught exception in onBackpressureDrop operator then would incorrectly
   // be picked up by this call to doOnError
   .doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable t) {
        errorOccurred.set(true);
      }
    })
   .onBackpressureDrop(THROW_NON_FATAL)
   .subscribe(ts);
  assertFalse(errorOccurred.get());
}

代码示例来源:origin: ReactiveX/RxJava

.doOnCancel(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {
   final AtomicBoolean errorOccurred = new AtomicBoolean(false);
   TestSubscriber<Long> ts = TestSubscriber.create(0);
   infinite
    .subscribeOn(Schedulers.computation())
    .doOnError(new Consumer<Throwable>() {
       @Override
       public void accept(Throwable t) {
         errorOccurred.set(true);
       }
     })
    .onBackpressureBuffer(1, THROWS_NON_FATAL)
    .subscribe(ts);
   ts.awaitTerminalEvent();
   assertFalse(errorOccurred.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextDoOnErrorFused2() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException("First");
    }
  })
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Second");
    }
  })
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Third");
    }
  })
  .publish();
  TestSubscriber<Integer> ts = cf.test();
  cf.connect();
  ts.assertFailure(CompositeException.class);
  TestHelper.assertError(ts, 0, TestException.class, "First");
  TestHelper.assertError(ts, 1, TestException.class, "Second");
  TestHelper.assertError(ts, 2, TestException.class, "Third");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextDoOnErrorFusedConditional2() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException("First");
    }
  })
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Second");
    }
  })
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Third");
    }
  })
  .filter(Functions.alwaysTrue())
  .publish();
  TestSubscriber<Integer> ts = cf.test();
  cf.connect();
  ts.assertFailure(CompositeException.class);
  TestHelper.assertError(ts, 0, TestException.class, "First");
  TestHelper.assertError(ts, 1, TestException.class, "Second");
  TestHelper.assertError(ts, 2, TestException.class, "Third");
}

代码示例来源:origin: ReactiveX/RxJava

.doOnError(new Consumer<Throwable>() {
  @Override
  public void accept(Throwable e) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorThrows() {
  TestSubscriber<Object> ts = TestSubscriber.create();
  Flowable.error(new TestException())
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) {
      throw new TestException();
    }
  }).subscribe(ts);
  ts.assertNoValues();
  ts.assertNotComplete();
  ts.assertError(CompositeException.class);
  CompositeException ex = (CompositeException)ts.errors().get(0);
  List<Throwable> exceptions = ex.getExceptions();
  assertEquals(2, exceptions.size());
  Assert.assertTrue(exceptions.get(0) instanceof TestException);
  Assert.assertTrue(exceptions.get(1) instanceof TestException);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeError() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Consumer<Throwable> onError = createOnErrorAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "))
          .concatWith(Flowable.<String>error(new RuntimeException()));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnCancel(unsub)
  .doOnError(onError);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("disposed", "error"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
  final List<String> events = new ArrayList<String>();
  final Callable<Resource> resourceFactory = createResourceFactory(events);
  final Consumer<Throwable> onError = createOnErrorAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "))
          .concatWith(Flowable.<String>error(new RuntimeException()));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnCancel(unsub)
  .doOnError(onError);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("error", "disposed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorOnErrorCrashConditional() {
  TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Inner");
    }
  })
  .filter(Functions.alwaysTrue())
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextDoOnErrorFused() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException("First");
    }
  })
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Second");
    }
  })
  .publish();
  TestSubscriber<Integer> ts = cf.test();
  cf.connect();
  ts.assertFailure(CompositeException.class);
  TestHelper.assertError(ts, 0, TestException.class, "First");
  TestHelper.assertError(ts, 1, TestException.class, "Second");
}

代码示例来源:origin: ReactiveX/RxJava

.doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable t1) {
Thread.sleep(100);
interval
.doOnError(new Consumer<Throwable>() {
  @Override
  public void accept(Throwable t1) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextDoOnErrorFusedConditional() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException("First");
    }
  })
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Second");
    }
  })
  .filter(Functions.alwaysTrue())
  .publish();
  TestSubscriber<Integer> ts = cf.test();
  cf.connect();
  ts.assertFailure(CompositeException.class);
  TestHelper.assertError(ts, 0, TestException.class, "First");
  TestHelper.assertError(ts, 1, TestException.class, "Second");
}

相关文章

Flowable类方法