io.reactivex.Flowable.fromCallable()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(314)

本文整理了Java中io.reactivex.Flowable.fromCallable()方法的一些代码示例,展示了Flowable.fromCallable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromCallable()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromCallable

Flowable.fromCallable介绍

[英]Returns a Flowable that, when a Subscriber subscribes to it, invokes a function you specify and then emits the value returned from that function.

This allows you to defer the execution of the function you specify until a Subscriber subscribes to the Publisher. That is to say, it makes the function "lazy." Backpressure: The operator honors backpressure from downstream. Scheduler: fromCallable does not operate by default on a particular Scheduler.
[中]返回一个可流动函数,当订阅服务器订阅该函数时,该函数调用您指定的函数,然后发出从该函数返回的值。
这允许您推迟执行指定的函数,直到订阅服务器订阅发布服务器。也就是说,它使函数“懒惰”背压:操作员接受来自下游的背压。调度程序:默认情况下,fromCallable不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Long> createPublisher(final long elements) {
  3. return
  4. Flowable.fromCallable(new Callable<Long>() {
  5. @Override
  6. public Long call() throws Exception {
  7. return 1L;
  8. }
  9. }
  10. )
  11. ;
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<? extends Object> apply(Integer v)
  3. throws Exception {
  4. return Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. return ++calls[0];
  8. }
  9. });
  10. }
  11. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<? extends Object> apply(Integer v)
  3. throws Exception {
  4. return Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. return ++calls[0];
  8. }
  9. });
  10. }
  11. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<? extends Object> apply(Integer v)
  3. throws Exception {
  4. return Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. return null;
  8. }
  9. });
  10. }
  11. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<? extends Object> apply(Integer v)
  3. throws Exception {
  4. return Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. return null;
  8. }
  9. });
  10. }
  11. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Integer> apply(Integer v) throws Exception {
  3. return Flowable.fromCallable(new Callable<Integer>() {
  4. @Override
  5. public Integer call() throws Exception {
  6. throw new TestException();
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromCallableNull() {
  3. Flowable.fromCallable(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void shouldNotInvokeFuncUntilSubscription() throws Exception {
  4. Callable<Object> func = mock(Callable.class);
  5. when(func.call()).thenReturn(new Object());
  6. Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
  7. verifyZeroInteractions(func);
  8. fromCallableFlowable.subscribe();
  9. verify(func).call();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void shouldCallOnError() throws Exception {
  4. Callable<Object> func = mock(Callable.class);
  5. Throwable throwable = new IllegalStateException("Test exception");
  6. when(func.call()).thenThrow(throwable);
  7. Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
  8. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  9. fromCallableFlowable.subscribe(subscriber);
  10. verify(subscriber, never()).onNext(any());
  11. verify(subscriber, never()).onComplete();
  12. verify(subscriber).onError(throwable);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void shouldCallOnNextAndOnCompleted() throws Exception {
  4. Callable<String> func = mock(Callable.class);
  5. when(func.call()).thenReturn("test_value");
  6. Flowable<String> fromCallableFlowable = Flowable.fromCallable(func);
  7. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  8. fromCallableFlowable.subscribe(subscriber);
  9. verify(subscriber).onNext("test_value");
  10. verify(subscriber).onComplete();
  11. verify(subscriber, never()).onError(any(Throwable.class));
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromCallableReturnsNull() {
  3. Flowable.fromCallable(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. return null;
  7. }
  8. }).blockingLast();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelledUpFrontConnectAnyway() {
  3. final AtomicInteger call = new AtomicInteger();
  4. Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. return call.incrementAndGet();
  8. }
  9. })
  10. .cache()
  11. .test(1L, true)
  12. .assertNoValues();
  13. assertEquals(1, call.get());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void scalarXMap() {
  3. Flowable.fromCallable(Functions.justCallable(1))
  4. .flatMap(Functions.justFunction(Flowable.fromCallable(Functions.justCallable(2))))
  5. .test()
  6. .assertResult(2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callableThrows() {
  3. Flowable.fromCallable(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. throw new TestException();
  7. }
  8. })
  9. .flatMapIterable(Functions.justFunction(Arrays.asList(1, 2, 3)))
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callable() {
  3. Maybe.concat(Flowable.fromCallable(new Callable<Maybe<Integer>>() {
  4. @Override
  5. public Maybe<Integer> call() throws Exception {
  6. return Maybe.just(1);
  7. }
  8. }))
  9. .test()
  10. .assertResult(1);
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callable() {
  3. Single.concat(Flowable.fromCallable(new Callable<Single<Integer>>() {
  4. @Override
  5. public Single<Integer> call() throws Exception {
  6. return Single.just(1);
  7. }
  8. }))
  9. .test()
  10. .assertResult(1);
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mergeScalarError() {
  3. Flowable.merge(Flowable.just(Flowable.fromCallable(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. throw new TestException();
  7. }
  8. })).hide())
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callableCrash() {
  3. Flowable.just(1).hide()
  4. .concatMap(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. throw new TestException();
  8. }
  9. })))
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void scalarXMap() {
  3. Flowable.fromCallable(Functions.justCallable(1))
  4. .switchMap(Functions.justFunction(Flowable.just(1)))
  5. .test()
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callableCrashDelayError() {
  3. Flowable.just(1).hide()
  4. .concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. throw new TestException();
  8. }
  9. })))
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

相关文章

Flowable类方法