本文整理了Java中io.reactivex.Flowable.doOnError()
方法的一些代码示例,展示了Flowable.doOnError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnError()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnError
[英]Modifies the source Publisher so that it invokes an action if it calls onError.
In case the onError action throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onError时调用操作。
如果OneError操作抛出,下游将收到一个包含原始异常和OneError抛出的异常的复合异常。
背压:操作员不会干扰由源发布者的背压行为确定的背压。计划程序:默认情况下,doOnError不会在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnErrorNull() {
just1.doOnError(null);
}
代码示例来源:origin: skylot/jadx
private void searchFieldSubscribe() {
searchEmitter = new SearchEventEmitter();
Flowable<String> textChanges = onTextFieldChanges(searchField);
Flowable<String> searchEvents = Flowable.merge(textChanges, searchEmitter.getFlowable());
searchDisposable = searchEvents
.filter(text -> text.length() > 0)
.subscribeOn(Schedulers.single())
.doOnNext(r -> LOG.debug("search event: {}", r))
.switchMap(text -> prepareSearch(text)
.doOnError(e -> LOG.error("Error prepare search: {}", e.getMessage(), e))
.subscribeOn(Schedulers.single())
.toList()
.toFlowable(), 1)
.observeOn(SwingSchedulers.edt())
.doOnError(e -> LOG.error("Error while searching: {}", e.getMessage(), e))
.subscribe(this::processSearchResults);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMapWithError() {
final List<Throwable> errors = new ArrayList<Throwable>();
Flowable<String> w = Flowable.just("one", "fail", "two", "three", "fail");
Flowable<String> m = w.map(new Function<String, String>() {
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new TestException("Forced Failure");
}
return s;
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t1) {
errors.add(t1);
}
});
m.subscribe(stringSubscriber);
verify(stringSubscriber, times(1)).onNext("one");
verify(stringSubscriber, never()).onNext("two");
verify(stringSubscriber, never()).onNext("three");
verify(stringSubscriber, never()).onComplete();
verify(stringSubscriber, times(1)).onError(any(TestException.class));
TestHelper.assertError(errors, 0, TestException.class, "Forced Failure");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
Flowable<String> src = Flowable.just("a", "b", "null", "c");
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
src
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
errorOccurred.set(true);
}
})
.distinctUntilChanged(THROWS_NON_FATAL)
.subscribe(w);
Assert.assertFalse(errorOccurred.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
Flowable.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorResumeNext(Flowable.<String>empty())
.subscribe();
latch.await();
assertNotEquals(Thread.currentThread(), thread.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNonFatalExceptionThrownByCombinatorForSingleSourceIsNotReportedByUpstreamOperator() {
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
TestSubscriber<Integer> ts = TestSubscriber.create(1);
Flowable<Integer> source = Flowable.just(1)
// if haven't caught exception in combineLatest operator then would incorrectly
// be picked up by this call to doOnError
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
errorOccurred.set(true);
}
});
Flowable
.combineLatest(Collections.singletonList(source), THROW_NON_FATAL)
.subscribe(ts);
assertFalse(errorOccurred.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnError() {
final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
Throwable t = null;
try {
Flowable.<String> error(new RuntimeException("an error"))
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable v) {
r.set(v);
}
}).blockingSingle();
fail("expected exception, not a return value");
} catch (Throwable e) {
t = e;
}
assertNotNull(t);
assertEquals(t, r.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() {
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
//request 0
TestSubscriber<Long> ts = TestSubscriber.create(0);
//range method emits regardless of requests so should trigger onBackpressureDrop action
range(2)
// if haven't caught exception in onBackpressureDrop operator then would incorrectly
// be picked up by this call to doOnError
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
errorOccurred.set(true);
}
})
.onBackpressureDrop(THROW_NON_FATAL)
.subscribe(ts);
assertFalse(errorOccurred.get());
}
代码示例来源:origin: ReactiveX/RxJava
.doOnCancel(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
TestSubscriber<Long> ts = TestSubscriber.create(0);
infinite
.subscribeOn(Schedulers.computation())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
errorOccurred.set(true);
}
})
.onBackpressureBuffer(1, THROWS_NON_FATAL)
.subscribe(ts);
ts.awaitTerminalEvent();
assertFalse(errorOccurred.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextDoOnErrorFused2() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException("First");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Second");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Third");
}
})
.publish();
TestSubscriber<Integer> ts = cf.test();
cf.connect();
ts.assertFailure(CompositeException.class);
TestHelper.assertError(ts, 0, TestException.class, "First");
TestHelper.assertError(ts, 1, TestException.class, "Second");
TestHelper.assertError(ts, 2, TestException.class, "Third");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextDoOnErrorFusedConditional2() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException("First");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Second");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Third");
}
})
.filter(Functions.alwaysTrue())
.publish();
TestSubscriber<Integer> ts = cf.test();
cf.connect();
ts.assertFailure(CompositeException.class);
TestHelper.assertError(ts, 0, TestException.class, "First");
TestHelper.assertError(ts, 1, TestException.class, "Second");
TestHelper.assertError(ts, 2, TestException.class, "Third");
}
代码示例来源:origin: ReactiveX/RxJava
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();
Flowable.error(new TestException())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) {
throw new TestException();
}
}).subscribe(ts);
ts.assertNoValues();
ts.assertNotComplete();
ts.assertError(CompositeException.class);
CompositeException ex = (CompositeException)ts.errors().get(0);
List<Throwable> exceptions = ex.getExceptions();
assertEquals(2, exceptions.size());
Assert.assertTrue(exceptions.get(0) instanceof TestException);
Assert.assertTrue(exceptions.get(1) instanceof TestException);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Consumer<Throwable> onError = createOnErrorAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "))
.concatWith(Flowable.<String>error(new RuntimeException()));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnCancel(unsub)
.doOnError(onError);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("disposed", "error"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
final Callable<Resource> resourceFactory = createResourceFactory(events);
final Consumer<Throwable> onError = createOnErrorAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
@Override
public Flowable<String> apply(Resource resource) {
return Flowable.fromArray(resource.getTextFromWeb().split(" "))
.concatWith(Flowable.<String>error(new RuntimeException()));
}
};
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnCancel(unsub)
.doOnError(onError);
flowable.safeSubscribe(subscriber);
assertEquals(Arrays.asList("error", "disposed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorOnErrorCrashConditional() {
TestSubscriber<Object> ts = Flowable.error(new TestException("Outer"))
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Inner");
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(CompositeException.class);
List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class, "Outer");
TestHelper.assertError(errors, 1, TestException.class, "Inner");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextDoOnErrorFused() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException("First");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Second");
}
})
.publish();
TestSubscriber<Integer> ts = cf.test();
cf.connect();
ts.assertFailure(CompositeException.class);
TestHelper.assertError(ts, 0, TestException.class, "First");
TestHelper.assertError(ts, 1, TestException.class, "Second");
}
代码示例来源:origin: ReactiveX/RxJava
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t1) {
Thread.sleep(100);
interval
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t1) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextDoOnErrorFusedConditional() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
throw new TestException("First");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Second");
}
})
.filter(Functions.alwaysTrue())
.publish();
TestSubscriber<Integer> ts = cf.test();
cf.connect();
ts.assertFailure(CompositeException.class);
TestHelper.assertError(ts, 0, TestException.class, "First");
TestHelper.assertError(ts, 1, TestException.class, "Second");
}
内容来源于网络,如有侵权,请联系作者删除!