io.reactivex.Observable.fromFuture()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中io.reactivex.Observable.fromFuture()方法的一些代码示例,展示了Observable.fromFuture()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.fromFuture()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:fromFuture

Observable.fromFuture介绍

[英]Converts a Future into an ObservableSource.

You can convert any object that supports the Future interface into an ObservableSource that emits the return value of the Future#get method of that object, by passing the object into the frommethod.

Important note: This ObservableSource is blocking; you cannot dispose it.

Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the cancellation effect: futureObservableSource.doOnDispose(() -> future.cancel(true));. Scheduler: fromFuture does not operate by default on a particular Scheduler.
[中]将未来转化为可观察的资源。
您可以将任何支持Future接口的对象转换为ObservableSource,通过将该对象传递到frommethod,该对象将发出该对象的Future#get方法的返回值。
*重要提示:*此可观察资源处于阻塞状态;你不能处理它。
不像1。x、 处理可观察的事物不会抵消未来。如有必要,人们可以使用合成来实现消除效果:futureObservableSource。doOnDispose(()->未来。取消(对);。调度器:默认情况下,fromFuture不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromFutureSchedulerNull() {
  3. FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  4. Observable.fromFuture(f, null);
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromFutureTimedUnitNull() {
  3. Observable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromFutureTimedFutureNull() {
  3. Observable.fromFuture(null, 1, TimeUnit.SECONDS);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromFutureNull() {
  3. Observable.fromFuture(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromFutureTimedSchedulerNull() {
  3. Observable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void fromFutureTimedReturnsNull() {
  3. FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  4. f.run();
  5. Observable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSuccess() throws Exception {
  3. @SuppressWarnings("unchecked")
  4. Future<Object> future = mock(Future.class);
  5. Object value = new Object();
  6. when(future.get()).thenReturn(value);
  7. Observer<Object> o = TestHelper.mockObserver();
  8. TestObserver<Object> to = new TestObserver<Object>(o);
  9. Observable.fromFuture(future).subscribe(to);
  10. to.dispose();
  11. verify(o, times(1)).onNext(value);
  12. verify(o, times(1)).onComplete();
  13. verify(o, never()).onError(any(Throwable.class));
  14. verify(future, never()).cancel(true);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testFailure() throws Exception {
  3. @SuppressWarnings("unchecked")
  4. Future<Object> future = mock(Future.class);
  5. RuntimeException e = new RuntimeException();
  6. when(future.get()).thenThrow(e);
  7. Observer<Object> o = TestHelper.mockObserver();
  8. TestObserver<Object> to = new TestObserver<Object>(o);
  9. Observable.fromFuture(future).subscribe(to);
  10. to.dispose();
  11. verify(o, never()).onNext(null);
  12. verify(o, never()).onComplete();
  13. verify(o, times(1)).onError(e);
  14. verify(future, never()).cancel(true);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
  2. ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  3. Observable<T> o = fromFuture(future);
  4. return o.subscribeOn(scheduler);

代码示例来源:origin: ReactiveX/RxJava

  1. public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
  2. ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  3. Observable<T> o = fromFuture(future, timeout, unit);
  4. return o.subscribeOn(scheduler);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSuccessOperatesOnSuppliedScheduler() throws Exception {
  3. @SuppressWarnings("unchecked")
  4. Future<Object> future = mock(Future.class);
  5. Object value = new Object();
  6. when(future.get()).thenReturn(value);
  7. Observer<Object> o = TestHelper.mockObserver();
  8. TestScheduler scheduler = new TestScheduler();
  9. TestObserver<Object> to = new TestObserver<Object>(o);
  10. Observable.fromFuture(future, scheduler).subscribe(to);
  11. verify(o, never()).onNext(value);
  12. scheduler.triggerActions();
  13. verify(o, times(1)).onNext(value);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCancelledBeforeSubscribe() throws Exception {
  3. @SuppressWarnings("unchecked")
  4. Future<Object> future = mock(Future.class);
  5. CancellationException e = new CancellationException("unit test synthetic cancellation");
  6. when(future.get()).thenThrow(e);
  7. Observer<Object> o = TestHelper.mockObserver();
  8. TestObserver<Object> to = new TestObserver<Object>(o);
  9. to.dispose();
  10. Observable.fromFuture(future).subscribe(to);
  11. to.assertNoErrors();
  12. to.assertNotComplete();
  13. }

代码示例来源:origin: redisson/redisson

  1. public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
  2. ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  3. Observable<T> o = fromFuture(future);
  4. return o.subscribeOn(scheduler);

代码示例来源:origin: redisson/redisson

  1. public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
  2. ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  3. Observable<T> o = fromFuture(future, timeout, unit);
  4. return o.subscribeOn(scheduler);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromFutureReturnsNull() {
  3. FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  4. f.run();
  5. TestObserver<Object> to = new TestObserver<Object>();
  6. Observable.fromFuture(f).subscribe(to);
  7. to.assertNoValues();
  8. to.assertNotComplete();
  9. to.assertError(NullPointerException.class);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromFutureTimeout() throws Exception {
  3. Observable.fromFuture(Observable.never()
  4. .toFuture(), 100, TimeUnit.MILLISECONDS, Schedulers.io())
  5. .test()
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertFailure(TimeoutException.class);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. Observable<Object> futureObservable = Observable.fromFuture(future);

代码示例来源:origin: cn.leancloud/storage-core

  1. FutureTask<List<AVObject>> futureTask = new FutureTask<List<AVObject>>(callable);
  2. executor.submit(futureTask);
  3. return Observable.fromFuture(futureTask);

代码示例来源:origin: cn.leancloud/storage-core

  1. FutureTask<String> futureTask = new FutureTask<>(callable);
  2. executor.submit(futureTask);
  3. Observable result = Observable.fromFuture(futureTask);
  4. if (isAsync) {
  5. result = result.subscribeOn(Schedulers.io());

相关文章

Observable类方法