本文整理了Java中io.reactivex.Flowable.fromFuture()
方法的一些代码示例,展示了Flowable.fromFuture()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromFuture()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromFuture
[英]Converts a Future into a Publisher.
You can convert any object that supports the Future interface into a Publisher that emits the return value of the Future#get method of that object by passing the object into the frommethod.
Important note: This Publisher is blocking on the thread it gets subscribed on; you cannot cancel it.
Unlike 1.x, canceling the Flowable won't cancel the future. If necessary, one can use composition to achieve the cancellation effect: futurePublisher.doOnCancel(() -> future.cancel(true));. Backpressure: The operator honors backpressure from downstream. Scheduler: fromFuture does not operate by default on a particular Scheduler.
[中]将未来转换为发布者。
您可以将任何支持Future接口的对象转换为发布服务器,通过将该对象传递到frommethod,该发布服务器将发出该对象的Future#get方法的返回值。
*重要提示:*此发布服务器正在阻止其订阅的线程;你不能取消它。
不像1。x、 取消流动不会取消未来。如有必要,可以使用合成来实现取消效果:futurePublisher。doOnCancel(()->未来。取消(对);。背压:操作员接受来自下游的背压。调度程序:默认情况下,fromFuture不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(final long elements) {
FutureTask<Long> ft = new FutureTask<Long>(new Callable<Long>() {
@Override
public Long call() throws Exception {
return 1L;
}
});
ft.run();
return Flowable.fromFuture(ft);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureNull() {
Flowable.fromFuture(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedFutureNull() {
Flowable.fromFuture(null, 1, TimeUnit.SECONDS);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
Flowable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
Flowable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedUnitNull() {
Flowable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
f.run();
Flowable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@code Single}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.s.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param scheduler
* the {@link Scheduler} to wait for the Future on. Use a Scheduler such as
* {@link Schedulers#io()} that can block and wait on the Future
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return a {@code Single} that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Single<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
return toSingle(Flowable.<T>fromFuture(future, scheduler));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Converts a {@link Future} into a {@code Single}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into a Single that emits the return
* value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
* <p>
* <em>Important note:</em> This Single is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return a {@code Single} that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromFuture(Future<? extends T> future) {
return toSingle(Flowable.<T>fromFuture(future));
}
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
return toSingle(Flowable.<T>fromFuture(future, timeout, unit));
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
return toSingle(Flowable.<T>fromFuture(future, timeout, unit, scheduler));
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSuccess() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
Flowable.fromFuture(future).subscribe(ts);
ts.dispose();
verify(subscriber, times(1)).onNext(value);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(future, never()).cancel(anyBoolean());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFailure() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
RuntimeException e = new RuntimeException();
when(future.get()).thenThrow(e);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
Flowable.fromFuture(future).subscribe(ts);
ts.dispose();
verify(subscriber, never()).onNext(null);
verify(subscriber, never()).onComplete();
verify(subscriber, times(1)).onError(e);
verify(future, never()).cancel(anyBoolean());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSuccessOperatesOnSuppliedScheduler() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestScheduler scheduler = new TestScheduler();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
Flowable.fromFuture(future, scheduler).subscribe(ts);
verify(subscriber, never()).onNext(value);
scheduler.triggerActions();
verify(subscriber, times(1)).onNext(value);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCancelledBeforeSubscribe() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
CancellationException e = new CancellationException("unit test synthetic cancellation");
when(future.get()).thenThrow(e);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
ts.dispose();
Flowable.fromFuture(future).subscribe(ts);
ts.assertNoErrors();
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fromFutureReturnsNull() {
FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
f.run();
TestSubscriber<Object> ts = new TestSubscriber<Object>();
Flowable.fromFuture(f).subscribe(ts);
ts.assertNoValues();
ts.assertNotComplete();
ts.assertError(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void withTimeoutNoTimeout() {
FutureTask<Integer> task = new FutureTask<Integer>(new Runnable() {
@Override
public void run() {
}
}, 1);
task.run();
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.fromFuture(task, 1, TimeUnit.SECONDS).subscribe(ts);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void withTimeoutTimeout() {
FutureTask<Integer> task = new FutureTask<Integer>(new Runnable() {
@Override
public void run() {
}
}, 1);
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.fromFuture(task, 10, TimeUnit.MILLISECONDS).subscribe(ts);
ts.assertNoValues();
ts.assertError(TimeoutException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void withTimeoutNoTimeoutScheduler() {
FutureTask<Integer> task = new FutureTask<Integer>(new Runnable() {
@Override
public void run() {
}
}, 1);
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.fromFuture(task, Schedulers.computation()).subscribe(ts);
task.run();
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backpressure() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
FutureTask<Integer> f = new FutureTask<Integer>(new Runnable() {
@Override
public void run() {
}
}, 1);
f.run();
Flowable.fromFuture(f).subscribe(ts);
ts.assertNoValues();
ts.request(1);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!