io.reactivex.Observable.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(221)

本文整理了Java中io.reactivex.Observable.doOnError()方法的一些代码示例,展示了Observable.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnError

Observable.doOnError介绍

[英]Modifies the source ObservableSource so that it invokes an action if it calls onError.

In case the onError action throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError.

Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onError时调用操作。
如果OneRor操作抛出,下游将收到一个复合异常,其中包含原始异常和OneRor抛出的异常。
计划程序:默认情况下,doOnError不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnErrorNull() {
  3. just1.doOnError(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMapWithError() {
  3. Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
  4. Observable<String> m = w.map(new Function<String, String>() {
  5. @Override
  6. public String apply(String s) {
  7. if ("fail".equals(s)) {
  8. throw new RuntimeException("Forced Failure");
  9. }
  10. return s;
  11. }
  12. }).doOnError(new Consumer<Throwable>() {
  13. @Override
  14. public void accept(Throwable t1) {
  15. t1.printStackTrace();
  16. }
  17. });
  18. m.subscribe(stringObserver);
  19. verify(stringObserver, times(1)).onNext("one");
  20. verify(stringObserver, never()).onNext("two");
  21. verify(stringObserver, never()).onNext("three");
  22. verify(stringObserver, never()).onComplete();
  23. verify(stringObserver, times(1)).onError(any(Throwable.class));
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testOnErrorCalledOnScheduler() throws Exception {
  3. final CountDownLatch latch = new CountDownLatch(1);
  4. final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  5. Observable.<String>error(new Exception())
  6. .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override
  9. public void accept(Throwable throwable) throws Exception {
  10. thread.set(Thread.currentThread());
  11. latch.countDown();
  12. }
  13. })
  14. .onErrorResumeNext(Observable.<String>empty())
  15. .subscribe();
  16. latch.await();
  17. assertNotEquals(Thread.currentThread(), thread.get());
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnError() {
  3. final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
  4. Throwable t = null;
  5. try {
  6. Observable.<String> error(new RuntimeException("an error"))
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override
  9. public void accept(Throwable v) {
  10. r.set(v);
  11. }
  12. }).blockingSingle();
  13. fail("expected exception, not a return value");
  14. } catch (Throwable e) {
  15. t = e;
  16. }
  17. assertNotNull(t);
  18. assertEquals(t, r.get());
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnDispose(sourceUnsubscribed)
  2. .doOnComplete(sourceCompleted)
  3. .doOnError(sourceError)
  4. .subscribeOn(mockScheduler).replay();

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnError(new Consumer<Throwable>() {
  2. @Override
  3. public void accept(Throwable e) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorThrows() {
  3. TestObserver<Object> to = TestObserver.create();
  4. Observable.error(new TestException())
  5. .doOnError(new Consumer<Throwable>() {
  6. @Override
  7. public void accept(Throwable e) {
  8. throw new TestException();
  9. }
  10. }).subscribe(to);
  11. to.assertNoValues();
  12. to.assertNotComplete();
  13. to.assertError(CompositeException.class);
  14. CompositeException ex = (CompositeException)to.errors().get(0);
  15. List<Throwable> exceptions = ex.getExceptions();
  16. assertEquals(2, exceptions.size());
  17. Assert.assertTrue(exceptions.get(0) instanceof TestException);
  18. Assert.assertTrue(exceptions.get(1) instanceof TestException);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. .doOnError(new Consumer<Throwable>() {
  2. @Override
  3. public void accept(Throwable t1) {
  4. Thread.sleep(100);
  5. interval
  6. .doOnError(new Consumer<Throwable>() {
  7. @Override
  8. public void accept(Throwable t1) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDoesNotDisposesEagerlyBeforeError() {
  3. final List<String> events = new ArrayList<String>();
  4. final Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Consumer<Throwable> onError = createOnErrorAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
  8. @Override
  9. public Observable<String> apply(Resource resource) {
  10. return Observable.fromArray(resource.getTextFromWeb().split(" "))
  11. .concatWith(Observable.<String>error(new RuntimeException()));
  12. }
  13. };
  14. Observer<String> observer = TestHelper.mockObserver();
  15. Observable<String> o = Observable.using(resourceFactory, observableFactory,
  16. new DisposeAction(), false)
  17. .doOnDispose(unsub)
  18. .doOnError(onError);
  19. o.safeSubscribe(observer);
  20. assertEquals(Arrays.asList("error", /* "unsub",*/ "disposed"), events);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDisposesEagerlyBeforeError() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Consumer<Throwable> onError = createOnErrorAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
  8. @Override
  9. public Observable<String> apply(Resource resource) {
  10. return Observable.fromArray(resource.getTextFromWeb().split(" "))
  11. .concatWith(Observable.<String>error(new RuntimeException()));
  12. }
  13. };
  14. Observer<String> observer = TestHelper.mockObserver();
  15. Observable<String> o = Observable.using(resourceFactory, observableFactory,
  16. new DisposeAction(), true)
  17. .doOnDispose(unsub)
  18. .doOnError(onError);
  19. o.safeSubscribe(observer);
  20. assertEquals(Arrays.asList("disposed", "error" /*, "unsub"*/), events);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onErrorOnErrorCrashConditional() {
  3. TestObserver<Object> to = Observable.error(new TestException("Outer"))
  4. .doOnError(new Consumer<Throwable>() {
  5. @Override
  6. public void accept(Throwable e) throws Exception {
  7. throw new TestException("Inner");
  8. }
  9. })
  10. .filter(Functions.alwaysTrue())
  11. .test()
  12. .assertFailure(CompositeException.class);
  13. List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  14. TestHelper.assertError(errors, 0, TestException.class, "Outer");
  15. TestHelper.assertError(errors, 1, TestException.class, "Inner");
  16. }

代码示例来源:origin: apollographql/apollo-android

  1. @Test public void httpException() throws Exception {
  2. server.enqueue(new MockResponse().setResponseCode(401).setBody("Unauthorized request!"));
  3. final AtomicReference<Throwable> errorRef = new AtomicReference<>();
  4. final AtomicReference<String> errorResponse = new AtomicReference<>();
  5. Rx2Apollo
  6. .from(apolloClient.query(emptyQuery))
  7. .doOnError(new Consumer<Throwable>() {
  8. @Override public void accept(Throwable throwable) throws Exception {
  9. errorRef.set(throwable);
  10. errorResponse.set(((ApolloHttpException) throwable).rawResponse().body().string());
  11. }
  12. })
  13. .test()
  14. .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
  15. .assertError(ApolloHttpException.class);
  16. ApolloHttpException e = (ApolloHttpException) errorRef.get();
  17. assertThat(e.code()).isEqualTo(401);
  18. assertThat(e.message()).isEqualTo("Client Error");
  19. assertThat(errorResponse.get()).isEqualTo("Unauthorized request!");
  20. assertThat(e.getMessage()).isEqualTo("HTTP 401 Client Error");
  21. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. .doOnError(throwable -> showNotification("Could not parse input: " + throwable))
  2. .retryWhen(errorNotificationHandler -> errorNotificationHandler),
  3. sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setup_notification)),

代码示例来源:origin: Piasy/RxAndroidAudio

  1. /**
  2. * play audio from local file. should be scheduled in IO thread.
  3. */
  4. public Observable<Boolean> play(@NonNull final PlayConfig config) {
  5. if (!config.isArgumentValid()) {
  6. return Observable.error(new IllegalArgumentException(""));
  7. }
  8. return Observable.<Boolean>create(emitter -> {
  9. MediaPlayer player = create(config);
  10. setMediaPlayerListener(player, emitter);
  11. player.setVolume(config.mLeftVolume, config.mRightVolume);
  12. player.setAudioStreamType(config.mStreamType);
  13. player.setLooping(config.mLooping);
  14. if (config.needPrepare()) {
  15. player.prepare();
  16. }
  17. player.start();
  18. mPlayer = player;
  19. emitter.onNext(true);
  20. }).doOnError(e -> stopPlay());
  21. }

代码示例来源:origin: Piasy/RxAndroidAudio

  1. /**
  2. * prepare audio from local file. should be scheduled in IO thread.
  3. */
  4. public Observable<Boolean> prepare(@NonNull final PlayConfig config) {
  5. if (!config.isArgumentValid() || !config.isLocalSource()) {
  6. return Observable.error(new IllegalArgumentException(""));
  7. }
  8. return Observable.<Boolean>create(emitter -> {
  9. MediaPlayer player = create(config);
  10. setMediaPlayerListener(player, emitter);
  11. player.setVolume(config.mLeftVolume, config.mRightVolume);
  12. player.setAudioStreamType(config.mStreamType);
  13. player.setLooping(config.mLooping);
  14. if (config.needPrepare()) {
  15. player.prepare();
  16. }
  17. mPlayer = player;
  18. emitter.onNext(true);
  19. }).doOnError(e -> stopPlay());
  20. }

代码示例来源:origin: chat-sdk/chat-sdk-android

  1. /**
  2. /* Convenience method to save the message to the database then pass it to the token network adapter
  3. * send method so it can be sent via the network
  4. */
  5. public Observable<MessageSendProgress> implSendMessage(final Message message) {
  6. return Observable.create((ObservableOnSubscribe<MessageSendProgress>) e -> {
  7. message.update();
  8. message.getThread().update();
  9. if (ChatSDK.encryption() != null) {
  10. ChatSDK.encryption().encrypt(message);
  11. }
  12. e.onNext(new MessageSendProgress(message));
  13. e.onComplete();
  14. }).concatWith(sendMessage(message))
  15. .subscribeOn(Schedulers.single()).doOnComplete(() -> {
  16. message.setMessageStatus(MessageSendStatus.Sent);
  17. message.update();
  18. }).doOnError(throwable -> {
  19. message.setMessageStatus(MessageSendStatus.Failed);
  20. message.update();
  21. });
  22. }

代码示例来源:origin: io.reactivex/rxjavafx

  1. /**
  2. * Performs a given action on a Throwable on the FX thread in the event of an onError
  3. * @param onError
  4. * @param <T>
  5. */
  6. public static <T> ObservableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
  7. return obs -> obs.doOnError(e -> runOnFx(e,onError));
  8. }

代码示例来源:origin: io.reactivex.rxjava2/rxjavafx

  1. /**
  2. * Performs a given action on a Throwable on the FX thread in the event of an onError
  3. * @param onError
  4. * @param <T>
  5. */
  6. public static <T> ObservableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
  7. return obs -> obs.doOnError(e -> runOnFx(e,onError));
  8. }

代码示例来源:origin: spotify/mobius

  1. @Override
  2. public Observable<E> apply(Observable<F> effects) {
  3. return effects
  4. .ofType(effectClass)
  5. .compose(effectHandler)
  6. .doOnError(onErrorFunction.apply(effectHandler));
  7. }
  8. });

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test() {
  3. Observable.just(1)
  4. .flatMap(v -> single(v)
  5. .toObservable()
  6. .doOnError(w -> System.out.println("Error2 " + w))
  7. )
  8. .subscribe(v -> System.out.println(v), e -> System.out.println("Error " + e));
  9. }

相关文章

Observable类方法