io.reactivex.Flowable.safeSubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(305)

本文整理了Java中io.reactivex.Flowable.safeSubscribe()方法的一些代码示例,展示了Flowable.safeSubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.safeSubscribe()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:safeSubscribe

Flowable.safeSubscribe介绍

[英]Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber (if not already a SafeSubscriber) that deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the Reactive-Streams specification). Backpressure: This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior. Scheduler: safeSubscribe does not operate by default on a particular Scheduler.
[中]订阅当前的Flowable并将给定订阅服务器包装到SafeSubscriber(如果还不是SafeSubscriber)中,该订阅服务器处理行为不端的订阅服务器(不遵循反应流规范)引发的异常。背压:该操作符离开反应世界,背压行为取决于订户的行为。调度程序:默认情况下,safeSubscribe不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void safeSubscribeNull() {
  3. just1.safeSubscribe(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. f.safeSubscribe(subscriber);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testPeriodicObserverThrows() {
  3. Flowable<Long> source = Flowable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
  4. InOrder inOrder = inOrder(subscriber);
  5. source.safeSubscribe(new DefaultSubscriber<Long>() {
  6. @Override
  7. public void onNext(Long t) {
  8. if (t > 0) {
  9. throw new TestException();
  10. }
  11. subscriber.onNext(t);
  12. }
  13. @Override
  14. public void onError(Throwable e) {
  15. subscriber.onError(e);
  16. }
  17. @Override
  18. public void onComplete() {
  19. subscriber.onComplete();
  20. }
  21. });
  22. scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  23. inOrder.verify(subscriber).onNext(0L);
  24. inOrder.verify(subscriber).onError(any(TestException.class));
  25. inOrder.verifyNoMoreInteractions();
  26. verify(subscriber, never()).onComplete();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testOnceObserverThrows() {
  3. Flowable<Long> source = Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler);
  4. source.safeSubscribe(new DefaultSubscriber<Long>() {
  5. @Override
  6. public void onNext(Long t) {
  7. throw new TestException();
  8. }
  9. @Override
  10. public void onError(Throwable e) {
  11. subscriber.onError(e);
  12. }
  13. @Override
  14. public void onComplete() {
  15. subscriber.onComplete();
  16. }
  17. });
  18. scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  19. verify(subscriber).onError(any(TestException.class));
  20. verify(subscriber, never()).onNext(anyLong());
  21. verify(subscriber, never()).onComplete();
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. .safeSubscribe(subscriber);

代码示例来源:origin: ReactiveX/RxJava

  1. .safeSubscribe(new DefaultSubscriber<String>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeWithErrorInObserver() {
  3. final AtomicInteger count = new AtomicInteger();
  4. final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
  5. Flowable.just("1", "2", "three", "4").take(3)
  6. .safeSubscribe(new DefaultSubscriber<String>() {
  7. @Override
  8. public void onComplete() {
  9. System.out.println("completed");
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. error.set(e);
  14. System.out.println("error");
  15. e.printStackTrace();
  16. }
  17. @Override
  18. public void onNext(String v) {
  19. int num = Integer.parseInt(v);
  20. System.out.println(num);
  21. // doSomething(num);
  22. count.incrementAndGet();
  23. }
  24. });
  25. assertEquals(2, count.get());
  26. assertNotNull(error.get());
  27. if (!(error.get() instanceof NumberFormatException)) {
  28. fail("It should be a NumberFormatException");
  29. }
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. .safeSubscribe(new DefaultSubscriber<String>() {
  2. @Override
  3. public void onComplete() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void safeSubscriberAlreadySafe() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. Flowable.just(1).safeSubscribe(new SafeSubscriber<Integer>(ts));
  5. ts.assertResult(1);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDisposesEagerlyBeforeError() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Consumer<Throwable> onError = createOnErrorAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "))
  11. .concatWith(Flowable.<String>error(new RuntimeException()));
  12. }
  13. };
  14. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  15. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  16. new DisposeAction(), true)
  17. .doOnCancel(unsub)
  18. .doOnError(onError);
  19. flowable.safeSubscribe(subscriber);
  20. assertEquals(Arrays.asList("disposed", "error"), events);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeFinalValueThrows() {
  3. Flowable<Integer> source = Flowable.just(1).take(1);
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
  5. @Override
  6. public void onNext(Integer t) {
  7. throw new TestException();
  8. }
  9. };
  10. source.safeSubscribe(ts);
  11. ts.assertNoValues();
  12. ts.assertError(TestException.class);
  13. ts.assertNotComplete();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDisposesEagerlyBeforeCompletion() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Action completion = createOnCompletedAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "));
  11. }
  12. };
  13. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  14. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  15. new DisposeAction(), true)
  16. .doOnCancel(unsub)
  17. .doOnComplete(completion);
  18. flowable.safeSubscribe(subscriber);
  19. assertEquals(Arrays.asList("disposed", "completed"), events);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  3. final List<String> events = new ArrayList<String>();
  4. Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Action completion = createOnCompletedAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "));
  11. }
  12. };
  13. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  14. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  15. new DisposeAction(), false)
  16. .doOnCancel(unsub)
  17. .doOnComplete(completion);
  18. flowable.safeSubscribe(subscriber);
  19. assertEquals(Arrays.asList("completed", "disposed"), events);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUsingDoesNotDisposesEagerlyBeforeError() {
  3. final List<String> events = new ArrayList<String>();
  4. final Callable<Resource> resourceFactory = createResourceFactory(events);
  5. final Consumer<Throwable> onError = createOnErrorAction(events);
  6. final Action unsub = createUnsubAction(events);
  7. Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
  8. @Override
  9. public Flowable<String> apply(Resource resource) {
  10. return Flowable.fromArray(resource.getTextFromWeb().split(" "))
  11. .concatWith(Flowable.<String>error(new RuntimeException()));
  12. }
  13. };
  14. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  15. Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
  16. new DisposeAction(), false)
  17. .doOnCancel(unsub)
  18. .doOnError(onError);
  19. flowable.safeSubscribe(subscriber);
  20. assertEquals(Arrays.asList("error", "disposed"), events);
  21. }

代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators

  1. @Override
  2. public CompletionStage<Void> apply(Flowable<I> source) {
  3. WrappedSubscriber<I> wrapped = new WrappedSubscriber<>(subscriber);
  4. source.safeSubscribe(wrapped);
  5. return wrapped.future();
  6. }
  7. }

代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators

  1. @Override
  2. public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
  3. Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
  4. Objects.requireNonNull(stage).getRsProcessor())
  5. );
  6. return source -> Flowable.defer(() -> {
  7. Flowable<O> flowable = Flowable.fromPublisher(processor);
  8. source.safeSubscribe(processor);
  9. return flowable;
  10. });
  11. }
  12. }

代码示例来源:origin: iagocanalejas/retrocache

  1. @Test
  2. public void bodyThrowingInOnNextDeliveredToError() {
  3. server.enqueue(new MockResponse());
  4. RecordingSubscriber<String> subscriber = subscriberRule.create();
  5. final RuntimeException e = new RuntimeException();
  6. service.body().safeSubscribe(new ForwardingSubscriber<String>(subscriber) {
  7. @Override
  8. public void onNext(String value) {
  9. throw e;
  10. }
  11. });
  12. subscriber.assertError(e);
  13. }

代码示例来源:origin: iagocanalejas/retrocache

  1. @Test
  2. public void responseThrowingInOnNextDeliveredToError() {
  3. server.enqueue(new MockResponse());
  4. RecordingSubscriber<Response<String>> subscriber = subscriberRule.create();
  5. final RuntimeException e = new RuntimeException();
  6. service.response().safeSubscribe(new ForwardingSubscriber<Response<String>>(subscriber) {
  7. @Override
  8. public void onNext(Response<String> value) {
  9. throw e;
  10. }
  11. });
  12. subscriber.assertError(e);
  13. }

代码示例来源:origin: iagocanalejas/retrocache

  1. @Test
  2. public void resultThrowingInOnNextDeliveredToError() {
  3. server.enqueue(new MockResponse());
  4. RecordingSubscriber<Result<String>> subscriber = subscriberRule.create();
  5. final RuntimeException e = new RuntimeException();
  6. service.result().safeSubscribe(new ForwardingSubscriber<Result<String>>(subscriber) {
  7. @Override
  8. public void onNext(Result<String> value) {
  9. throw e;
  10. }
  11. });
  12. subscriber.assertError(e);
  13. }

代码示例来源:origin: iagocanalejas/retrocache

  1. @Test
  2. public void resultThrowingInOnErrorDeliveredToPlugin() {
  3. server.enqueue(new MockResponse());
  4. final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
  5. RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
  6. @Override
  7. public void accept(Throwable throwable) throws Exception {
  8. if (!throwableRef.compareAndSet(null, throwable)) {
  9. throw Exceptions.propagate(throwable);
  10. }
  11. }
  12. });
  13. RecordingSubscriber<Result<String>> subscriber = subscriberRule.create();
  14. final RuntimeException first = new RuntimeException();
  15. final RuntimeException second = new RuntimeException();
  16. service.result().safeSubscribe(new ForwardingSubscriber<Result<String>>(subscriber) {
  17. @Override
  18. public void onNext(Result<String> value) {
  19. // The only way to trigger onError for a result is if onNext throws.
  20. throw first;
  21. }
  22. @Override
  23. public void onError(Throwable throwable) {
  24. throw second;
  25. }
  26. });
  27. //noinspection ThrowableResultOfMethodCallIgnored
  28. CompositeException composite = (CompositeException) throwableRef.get();
  29. assertThat(composite.getExceptions()).containsExactly(first, second);
  30. }

相关文章

Flowable类方法