io.reactivex.Observable.fromCallable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.6k)|赞(0)|评价(0)|浏览(252)

本文整理了Java中io.reactivex.Observable.fromCallable()方法的一些代码示例,展示了Observable.fromCallable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.fromCallable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:fromCallable

Observable.fromCallable介绍

[英]Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.

This allows you to defer the execution of the function you specify until an observer subscribes to the ObservableSource. That is to say, it makes the function "lazy." Scheduler: fromCallable does not operate by default on a particular Scheduler.
[中]返回一个可观察对象,当观察者订阅它时,它将调用您指定的函数,然后发出从该函数返回的值。
这允许您推迟执行指定的函数,直到观察者订阅ObservableSource。也就是说,它使函数“懒惰”调度程序:默认情况下,fromCallable不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends Object> apply(Integer v)
      throws Exception {
    return Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return ++calls[0];
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Integer> apply(Integer v) throws Exception {
    return Observable.fromCallable(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        throw new TestException();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends Object> apply(Integer v)
      throws Exception {
    return Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return null;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends Object> apply(Integer v)
      throws Exception {
    return Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return ++calls[0];
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends Object> apply(Integer v)
      throws Exception {
    return Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return null;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Integer> apply(Integer v) throws Exception {
    return Observable.fromCallable(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        throw new TestException();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
  Callable<Object> func = mock(Callable.class);
  when(func.call()).thenReturn(new Object());
  Observable<Object> fromCallableObservable = Observable.fromCallable(func);
  verifyZeroInteractions(func);
  fromCallableObservable.subscribe();
  verify(func).call();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromCallableNull() {
  Observable.fromCallable(null);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnError() throws Exception {
  Callable<Object> func = mock(Callable.class);
  Throwable throwable = new IllegalStateException("Test exception");
  when(func.call()).thenThrow(throwable);
  Observable<Object> fromCallableObservable = Observable.fromCallable(func);
  Observer<Object> observer = TestHelper.mockObserver();
  fromCallableObservable.subscribe(observer);
  verify(observer, never()).onNext(any());
  verify(observer, never()).onComplete();
  verify(observer).onError(throwable);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
  Callable<String> func = mock(Callable.class);
  when(func.call()).thenReturn("test_value");
  Observable<String> fromCallableObservable = Observable.fromCallable(func);
  Observer<Object> observer = TestHelper.mockObserver();
  fromCallableObservable.subscribe(observer);
  verify(observer).onNext("test_value");
  verify(observer).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shouldAllowToThrowCheckedException() {
  final Exception checkedException = new Exception("test exception");
  Observable<Object> fromCallableObservable = Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw checkedException;
    }
  });
  Observer<Object> observer = TestHelper.mockObserver();
  fromCallableObservable.subscribe(observer);
  verify(observer).onSubscribe(any(Disposable.class));
  verify(observer).onError(checkedException);
  verifyNoMoreInteractions(observer);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromCallableReturnsNull() {
  Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return null;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void publishNoLeak() throws Exception {
  System.gc();
  Thread.sleep(100);
  long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
  source = Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new ExceptionData(new byte[100 * 1000 * 1000]);
    }
  })
  .publish()
  .refCount();
  source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer());
  System.gc();
  Thread.sleep(100);
  long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
  source = null;
  assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedOnCall() {
  final TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.fromCallable(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
      to.cancel();
      return 1;
    }
  })
      .subscribe(to);
  to.assertEmpty();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mergeScalarError() {
  Observable.merge(Observable.just(Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })).hide())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void take() {
    Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return 1;
      }
    })
        .take(1)
        .test()
        .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedOnArrival() {
  final int[] count = { 0 };
  Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      count[0]++;
      return 1;
    }
  })
      .test(true)
      .assertEmpty();
  assertEquals(0, count[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void startWithObservableNormal() {
  final AtomicBoolean run = new AtomicBoolean();
  Observable<Object> o = normal.completable
      .startWith(Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
          run.set(normal.get() == 0);
          return 1;
        }
      }));
  TestObserver<Object> to = new TestObserver<Object>();
  o.subscribe(to);
  Assert.assertTrue("Did not start with other", run.get());
  normal.assertSubscriptions(1);
  to.assertValue(1);
  to.assertComplete();
  to.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedOnCallThrows() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    final TestObserver<Integer> to = new TestObserver<Integer>();
    Observable.fromCallable(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        to.cancel();
        throw new TestException();
      }
    })
        .subscribe(to);
    to.assertEmpty();
    TestHelper.assertUndeliverable(errors, 0, TestException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void cancelledUpFront() {
    final AtomicInteger call = new AtomicInteger();
    Observable<Object> f = Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return call.incrementAndGet();
      }
    }).concatWith(Observable.never())
    .cache();

    f.test().assertValuesOnly(1);

    f.test(true)
    .assertEmpty();

    assertEquals(1, call.get());
  }
}

相关文章

Observable类方法