io.reactivex.Flowable.fromFuture()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中io.reactivex.Flowable.fromFuture()方法的一些代码示例,展示了Flowable.fromFuture()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromFuture()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromFuture

Flowable.fromFuture介绍

[英]Converts a Future into a Publisher.

You can convert any object that supports the Future interface into a Publisher that emits the return value of the Future#get method of that object by passing the object into the frommethod.

Important note: This Publisher is blocking on the thread it gets subscribed on; you cannot cancel it.

Unlike 1.x, canceling the Flowable won't cancel the future. If necessary, one can use composition to achieve the cancellation effect: futurePublisher.doOnCancel(() -> future.cancel(true));. Backpressure: The operator honors backpressure from downstream. Scheduler: fromFuture does not operate by default on a particular Scheduler.
[中]将未来转换为发布者。
您可以将任何支持Future接口的对象转换为发布服务器,通过将该对象传递到frommethod,该发布服务器将发出该对象的Future#get方法的返回值。
*重要提示:*此发布服务器正在阻止其订阅的线程;你不能取消它。
不像1。x、 取消流动不会取消未来。如有必要,可以使用合成来实现取消效果:futurePublisher。doOnCancel(()->未来。取消(对);。背压:操作员接受来自下游的背压。调度程序:默认情况下,fromFuture不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public Publisher<Long> createPublisher(final long elements) {
  FutureTask<Long> ft = new FutureTask<Long>(new Callable<Long>() {
    @Override
    public Long call() throws Exception {
      return 1L;
    }
  });
  ft.run();
  return Flowable.fromFuture(ft);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureNull() {
  Flowable.fromFuture(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedFutureNull() {
  Flowable.fromFuture(null, 1, TimeUnit.SECONDS);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
  Flowable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
  Flowable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedUnitNull() {
  Flowable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
 FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  f.run();
  Flowable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@code Single}.
 * <p>
 * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.s.png" alt="">
 * <p>
 * You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
 * the return value of the {@link Future#get} method of that object, by passing the object into the
 * {@code from} method.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>You specify which {@link Scheduler} this operator will use.</dd>
 * </dl>
 *
 * @param future
 *            the source {@link Future}
 * @param scheduler
 *            the {@link Scheduler} to wait for the Future on. Use a Scheduler such as
 *            {@link Schedulers#io()} that can block and wait on the Future
 * @param <T>
 *            the type of object that the {@link Future} returns, and also the type of item to be emitted by
 *            the resulting {@code Single}
 * @return a {@code Single} that emits the item from the source {@link Future}
 * @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Single<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
  return toSingle(Flowable.<T>fromFuture(future, scheduler));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Converts a {@link Future} into a {@code Single}.
 * <p>
 * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
 * <p>
 * You can convert any object that supports the {@link Future} interface into a Single that emits the return
 * value of the {@link Future#get} method of that object, by passing the object into the {@code from}
 * method.
 * <p>
 * <em>Important note:</em> This Single is blocking; you cannot dispose it.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param future
 *            the source {@link Future}
 * @param <T>
 *            the type of object that the {@link Future} returns, and also the type of item to be emitted by
 *            the resulting {@code Single}
 * @return a {@code Single} that emits the item from the source {@link Future}
 * @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromFuture(Future<? extends T> future) {
  return toSingle(Flowable.<T>fromFuture(future));
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
  return toSingle(Flowable.<T>fromFuture(future, timeout, unit));

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
  return toSingle(Flowable.<T>fromFuture(future, timeout, unit, scheduler));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSuccess() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  Object value = new Object();
  when(future.get()).thenReturn(value);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  Flowable.fromFuture(future).subscribe(ts);
  ts.dispose();
  verify(subscriber, times(1)).onNext(value);
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(future, never()).cancel(anyBoolean());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFailure() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  RuntimeException e = new RuntimeException();
  when(future.get()).thenThrow(e);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  Flowable.fromFuture(future).subscribe(ts);
  ts.dispose();
  verify(subscriber, never()).onNext(null);
  verify(subscriber, never()).onComplete();
  verify(subscriber, times(1)).onError(e);
  verify(future, never()).cancel(anyBoolean());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSuccessOperatesOnSuppliedScheduler() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  Object value = new Object();
  when(future.get()).thenReturn(value);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestScheduler scheduler = new TestScheduler();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  Flowable.fromFuture(future, scheduler).subscribe(ts);
  verify(subscriber, never()).onNext(value);
  scheduler.triggerActions();
  verify(subscriber, times(1)).onNext(value);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCancelledBeforeSubscribe() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  CancellationException e = new CancellationException("unit test synthetic cancellation");
  when(future.get()).thenThrow(e);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  ts.dispose();
  Flowable.fromFuture(future).subscribe(ts);
  ts.assertNoErrors();
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fromFutureReturnsNull() {
  FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  f.run();
  TestSubscriber<Object> ts = new TestSubscriber<Object>();
  Flowable.fromFuture(f).subscribe(ts);
  ts.assertNoValues();
  ts.assertNotComplete();
  ts.assertError(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withTimeoutNoTimeout() {
  FutureTask<Integer> task = new FutureTask<Integer>(new Runnable() {
    @Override
    public void run() {
    }
  }, 1);
  task.run();
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.fromFuture(task, 1, TimeUnit.SECONDS).subscribe(ts);
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withTimeoutTimeout() {
  FutureTask<Integer> task = new FutureTask<Integer>(new Runnable() {
    @Override
    public void run() {
    }
  }, 1);
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.fromFuture(task, 10, TimeUnit.MILLISECONDS).subscribe(ts);
  ts.assertNoValues();
  ts.assertError(TimeoutException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void withTimeoutNoTimeoutScheduler() {
    FutureTask<Integer> task = new FutureTask<Integer>(new Runnable() {
      @Override
      public void run() {

      }
    }, 1);

    TestSubscriber<Integer> ts = TestSubscriber.create();

    Flowable.fromFuture(task, Schedulers.computation()).subscribe(ts);

    task.run();

    ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
    ts.assertValue(1);
    ts.assertNoErrors();
    ts.assertComplete();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backpressure() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
  FutureTask<Integer> f = new FutureTask<Integer>(new Runnable() {
    @Override
    public void run() {
    }
  }, 1);
  f.run();
  Flowable.fromFuture(f).subscribe(ts);
  ts.assertNoValues();
  ts.request(1);
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

相关文章

Flowable类方法