io.reactivex.Flowable.fromCallable()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(267)

本文整理了Java中io.reactivex.Flowable.fromCallable()方法的一些代码示例,展示了Flowable.fromCallable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromCallable()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromCallable

Flowable.fromCallable介绍

[英]Returns a Flowable that, when a Subscriber subscribes to it, invokes a function you specify and then emits the value returned from that function.

This allows you to defer the execution of the function you specify until a Subscriber subscribes to the Publisher. That is to say, it makes the function "lazy." Backpressure: The operator honors backpressure from downstream. Scheduler: fromCallable does not operate by default on a particular Scheduler.
[中]返回一个可流动函数,当订阅服务器订阅该函数时,该函数调用您指定的函数,然后发出从该函数返回的值。
这允许您推迟执行指定的函数,直到订阅服务器订阅发布服务器。也就是说,它使函数“懒惰”背压:操作员接受来自下游的背压。调度程序:默认情况下,fromCallable不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public Publisher<Long> createPublisher(final long elements) {
  return
      Flowable.fromCallable(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
          return 1L;
        }
      }
      )
    ;
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<? extends Object> apply(Integer v)
      throws Exception {
    return Flowable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return ++calls[0];
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<? extends Object> apply(Integer v)
      throws Exception {
    return Flowable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return ++calls[0];
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<? extends Object> apply(Integer v)
      throws Exception {
    return Flowable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return null;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<? extends Object> apply(Integer v)
      throws Exception {
    return Flowable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return null;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer v) throws Exception {
    return Flowable.fromCallable(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        throw new TestException();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromCallableNull() {
  Flowable.fromCallable(null);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
  Callable<Object> func = mock(Callable.class);
  when(func.call()).thenReturn(new Object());
  Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
  verifyZeroInteractions(func);
  fromCallableFlowable.subscribe();
  verify(func).call();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnError() throws Exception {
  Callable<Object> func = mock(Callable.class);
  Throwable throwable = new IllegalStateException("Test exception");
  when(func.call()).thenThrow(throwable);
  Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  fromCallableFlowable.subscribe(subscriber);
  verify(subscriber, never()).onNext(any());
  verify(subscriber, never()).onComplete();
  verify(subscriber).onError(throwable);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
  Callable<String> func = mock(Callable.class);
  when(func.call()).thenReturn("test_value");
  Flowable<String> fromCallableFlowable = Flowable.fromCallable(func);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  fromCallableFlowable.subscribe(subscriber);
  verify(subscriber).onNext("test_value");
  verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromCallableReturnsNull() {
  Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return null;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void cancelledUpFrontConnectAnyway() {
  final AtomicInteger call = new AtomicInteger();
  Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return call.incrementAndGet();
    }
  })
  .cache()
  .test(1L, true)
  .assertNoValues();
  assertEquals(1, call.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scalarXMap() {
  Flowable.fromCallable(Functions.justCallable(1))
  .flatMap(Functions.justFunction(Flowable.fromCallable(Functions.justCallable(2))))
  .test()
  .assertResult(2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void callableThrows() {
  Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })
  .flatMapIterable(Functions.justFunction(Arrays.asList(1, 2, 3)))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void callable() {
    Maybe.concat(Flowable.fromCallable(new Callable<Maybe<Integer>>() {
      @Override
      public Maybe<Integer> call() throws Exception {
        return Maybe.just(1);
      }
    }))
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void callable() {
    Single.concat(Flowable.fromCallable(new Callable<Single<Integer>>() {
      @Override
      public Single<Integer> call() throws Exception {
        return Single.just(1);
      }
    }))
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mergeScalarError() {
  Flowable.merge(Flowable.just(Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })).hide())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void callableCrash() {
  Flowable.just(1).hide()
  .concatMap(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scalarXMap() {
  Flowable.fromCallable(Functions.justCallable(1))
  .switchMap(Functions.justFunction(Flowable.just(1)))
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void callableCrashDelayError() {
  Flowable.just(1).hide()
  .concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })))
  .test()
  .assertFailure(TestException.class);
}

相关文章

Flowable类方法