io.reactivex.Flowable.safeSubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(281)

本文整理了Java中io.reactivex.Flowable.safeSubscribe()方法的一些代码示例,展示了Flowable.safeSubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.safeSubscribe()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:safeSubscribe

Flowable.safeSubscribe介绍

[英]Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber (if not already a SafeSubscriber) that deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the Reactive-Streams specification). Backpressure: This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior. Scheduler: safeSubscribe does not operate by default on a particular Scheduler.
[中]订阅当前的Flowable并将给定订阅服务器包装到SafeSubscriber(如果还不是SafeSubscriber)中,该订阅服务器处理行为不端的订阅服务器(不遵循反应流规范)引发的异常。背压:该操作符离开反应世界,背压行为取决于订户的行为。调度程序:默认情况下,safeSubscribe不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void safeSubscribeNull() {
  just1.safeSubscribe(null);
}

代码示例来源:origin: ReactiveX/RxJava

f.safeSubscribe(subscriber);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testPeriodicObserverThrows() {
  Flowable<Long> source = Flowable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
  InOrder inOrder = inOrder(subscriber);
  source.safeSubscribe(new DefaultSubscriber<Long>() {
    @Override
    public void onNext(Long t) {
      if (t > 0) {
        throw new TestException();
      }
      subscriber.onNext(t);
    }
    @Override
    public void onError(Throwable e) {
      subscriber.onError(e);
    }
    @Override
    public void onComplete() {
      subscriber.onComplete();
    }
  });
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  inOrder.verify(subscriber).onNext(0L);
  inOrder.verify(subscriber).onError(any(TestException.class));
  inOrder.verifyNoMoreInteractions();
  verify(subscriber, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnceObserverThrows() {
  Flowable<Long> source = Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler);
  source.safeSubscribe(new DefaultSubscriber<Long>() {
    @Override
    public void onNext(Long t) {
      throw new TestException();
    }
    @Override
    public void onError(Throwable e) {
      subscriber.onError(e);
    }
    @Override
    public void onComplete() {
      subscriber.onComplete();
    }
  });
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  verify(subscriber).onError(any(TestException.class));
  verify(subscriber, never()).onNext(anyLong());
  verify(subscriber, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

.safeSubscribe(subscriber);

代码示例来源:origin: ReactiveX/RxJava

.safeSubscribe(new DefaultSubscriber<String>() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWithErrorInObserver() {
  final AtomicInteger count = new AtomicInteger();
  final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
  Flowable.just("1", "2", "three", "4").take(3)
  .safeSubscribe(new DefaultSubscriber<String>() {
    @Override
    public void onComplete() {
      System.out.println("completed");
    }
    @Override
    public void onError(Throwable e) {
      error.set(e);
      System.out.println("error");
      e.printStackTrace();
    }
    @Override
    public void onNext(String v) {
      int num = Integer.parseInt(v);
      System.out.println(num);
      // doSomething(num);
      count.incrementAndGet();
    }
  });
  assertEquals(2, count.get());
  assertNotNull(error.get());
  if (!(error.get() instanceof NumberFormatException)) {
    fail("It should be a NumberFormatException");
  }
}

代码示例来源:origin: ReactiveX/RxJava

.safeSubscribe(new DefaultSubscriber<String>() {
  @Override
  public void onComplete() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void safeSubscriberAlreadySafe() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.just(1).safeSubscribe(new SafeSubscriber<Integer>(ts));
  ts.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeError() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Consumer<Throwable> onError = createOnErrorAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "))
          .concatWith(Flowable.<String>error(new RuntimeException()));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnCancel(unsub)
  .doOnError(onError);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("disposed", "error"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeFinalValueThrows() {
  Flowable<Integer> source = Flowable.just(1).take(1);
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      throw new TestException();
    }
  };
  source.safeSubscribe(ts);
  ts.assertNoValues();
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnCancel(unsub)
  .doOnComplete(completion);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("disposed", "completed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnCancel(unsub)
  .doOnComplete(completion);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("completed", "disposed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
  final List<String> events = new ArrayList<String>();
  final Callable<Resource> resourceFactory = createResourceFactory(events);
  final Consumer<Throwable> onError = createOnErrorAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Flowable<String>> observableFactory = new Function<Resource, Flowable<String>>() {
    @Override
    public Flowable<String> apply(Resource resource) {
      return Flowable.fromArray(resource.getTextFromWeb().split(" "))
          .concatWith(Flowable.<String>error(new RuntimeException()));
    }
  };
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  Flowable<String> flowable = Flowable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnCancel(unsub)
  .doOnError(onError);
  flowable.safeSubscribe(subscriber);
  assertEquals(Arrays.asList("error", "disposed"), events);
}

代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators

@Override
  public CompletionStage<Void> apply(Flowable<I> source) {
    WrappedSubscriber<I> wrapped = new WrappedSubscriber<>(subscriber);
    source.safeSubscribe(wrapped);
    return wrapped.future();
  }
}

代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators

@Override
  public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
    Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
        Objects.requireNonNull(stage).getRsProcessor())
    );

    return source -> Flowable.defer(() -> {
      Flowable<O> flowable = Flowable.fromPublisher(processor);
      source.safeSubscribe(processor);
      return flowable;
    });
  }
}

代码示例来源:origin: iagocanalejas/retrocache

@Test
public void bodyThrowingInOnNextDeliveredToError() {
  server.enqueue(new MockResponse());
  RecordingSubscriber<String> subscriber = subscriberRule.create();
  final RuntimeException e = new RuntimeException();
  service.body().safeSubscribe(new ForwardingSubscriber<String>(subscriber) {
    @Override
    public void onNext(String value) {
      throw e;
    }
  });
  subscriber.assertError(e);
}

代码示例来源:origin: iagocanalejas/retrocache

@Test
public void responseThrowingInOnNextDeliveredToError() {
  server.enqueue(new MockResponse());
  RecordingSubscriber<Response<String>> subscriber = subscriberRule.create();
  final RuntimeException e = new RuntimeException();
  service.response().safeSubscribe(new ForwardingSubscriber<Response<String>>(subscriber) {
    @Override
    public void onNext(Response<String> value) {
      throw e;
    }
  });
  subscriber.assertError(e);
}

代码示例来源:origin: iagocanalejas/retrocache

@Test
public void resultThrowingInOnNextDeliveredToError() {
  server.enqueue(new MockResponse());
  RecordingSubscriber<Result<String>> subscriber = subscriberRule.create();
  final RuntimeException e = new RuntimeException();
  service.result().safeSubscribe(new ForwardingSubscriber<Result<String>>(subscriber) {
    @Override
    public void onNext(Result<String> value) {
      throw e;
    }
  });
  subscriber.assertError(e);
}

代码示例来源:origin: iagocanalejas/retrocache

@Test
public void resultThrowingInOnErrorDeliveredToPlugin() {
  server.enqueue(new MockResponse());
  final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
  RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
      if (!throwableRef.compareAndSet(null, throwable)) {
        throw Exceptions.propagate(throwable);
      }
    }
  });
  RecordingSubscriber<Result<String>> subscriber = subscriberRule.create();
  final RuntimeException first = new RuntimeException();
  final RuntimeException second = new RuntimeException();
  service.result().safeSubscribe(new ForwardingSubscriber<Result<String>>(subscriber) {
    @Override
    public void onNext(Result<String> value) {
      // The only way to trigger onError for a result is if onNext throws.
      throw first;
    }
    @Override
    public void onError(Throwable throwable) {
      throw second;
    }
  });
  //noinspection ThrowableResultOfMethodCallIgnored
  CompositeException composite = (CompositeException) throwableRef.get();
  assertThat(composite.getExceptions()).containsExactly(first, second);
}

相关文章

Flowable类方法