io.reactivex.Flowable.toList()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(269)

本文整理了Java中io.reactivex.Flowable.toList()方法的一些代码示例,展示了Flowable.toList()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.toList()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:toList

Flowable.toList介绍

[英]Returns a Single that emits a single item, a list composed of all the items emitted by the finite upstream source Publisher.

Normally, a Publisher that returns multiple items will do so by invoking its Subscriber's Subscriber#onNext method for each such item. You can change this behavior, instructing the Publisher to compose a list of all of these items and then to invoke the Subscriber's onNextfunction once, passing it the entire list, by calling the Publisher's toList method prior to calling its #subscribe method.

Note that this operator requires the upstream to signal onComplete for the accumulated list to be emitted. Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError. Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure to it). Scheduler: toList does not operate by default on a particular Scheduler.
[中]返回一个发出单个项的列表,该列表由有限上游源发布服务器发出的所有项组成。
通常,返回多个项目的发布者将通过为每个此类项目调用其订阅者的订阅者#onNext方法来执行此操作。您可以更改此行为,在调用订阅服务器的#subscribe方法之前,通过调用发布服务器的toList方法,指示发布服务器编写所有这些项的列表,然后调用订阅服务器的onNextfunction一次,并将整个列表传递给订阅服务器。
请注意,此运算符要求上游发出“完成”信号,以发出累积列表。无限且永远不完整的源永远不会通过此运算符发出任何信息,无限源可能导致致命的OutOfMemoryError。背压:操作员接受来自下游的背压,并以无限制的方式消耗源发布服务器(即,不向其施加背压)。Scheduler:toList默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<List<Object>> apply(Flowable<Object> f)
      throws Exception {
    return f.toList().toFlowable();
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testList() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  Single<List<String>> single = w.toList();
  SingleObserver<List<String>> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  verify(observer, times(1)).onSuccess(Arrays.asList("one", "two", "three"));
  verify(observer, Mockito.never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testListViaFlowable() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  Single<List<String>> single = w.toList();
  SingleObserver<List<String>> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  verify(observer, times(1)).onSuccess(Arrays.asList("one", "two", "three"));
  verify(observer, Mockito.never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values are not allowed")
public void testListWithNullValue() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", null, "three"));
  Single<List<String>> single = w.toList();
  SingleObserver<List<String>> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  verify(observer, times(1)).onSuccess(Arrays.asList("one", null, "three"));
  verify(observer, Mockito.never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Publisher<List<Integer>> createPublisher(final long elements) {
  return
      Flowable.range(1, 1000).toList().toFlowable()
    ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void toListSupplierReturnsNullSingle() {
  just1.toList(new Callable<Collection<Integer>>() {
    @Override
    public Collection<Integer> call() {
      return null;
    }
  }).blockingGet();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testToFutureList() throws InterruptedException, ExecutionException {
  Flowable<String> obs = Flowable.just("one", "two", "three");
  Future<List<String>> f = obs.toList().toFuture();
  assertEquals("one", f.get().get(0));
  assertEquals("two", f.get().get(1));
  assertEquals("three", f.get().get(2));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values are not allowed")
public void testListWithNullValueFlowable() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", null, "three"));
  Flowable<List<String>> flowable = w.toList().toFlowable();
  Subscriber<List<String>> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  verify(subscriber, times(1)).onNext(Arrays.asList("one", null, "three"));
  verify(subscriber, Mockito.never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void startWithIterable() {
  List<String> li = new ArrayList<String>();
  li.add("alpha");
  li.add("beta");
  List<String> values = Flowable.just("one", "two").startWith(li).toList().blockingGet();
  assertEquals("alpha", values.get(0));
  assertEquals("beta", values.get(1));
  assertEquals("one", values.get(2));
  assertEquals("two", values.get(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testListWithBlockingFirst() {
  Flowable<String> f = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  List<String> actual = f.toList().blockingGet();
  Assert.assertEquals(Arrays.asList("one", "two", "three"), actual);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testListWithBlockingFirstFlowable() {
  Flowable<String> f = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  List<String> actual = f.toList().toFlowable().blockingFirst();
  Assert.assertEquals(Arrays.asList("one", "two", "three"), actual);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void toListSupplierReturnsNull() {
  just1.toList(new Callable<Collection<Integer>>() {
    @Override
    public Collection<Integer> call() {
      return null;
    }
  }).toFlowable().blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void capacityHint() {
  Flowable.range(1, 10)
  .toList(4)
  .test()
  .assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void startWith1() {
  List<String> values = Flowable.just("one", "two")
      .startWithArray("zero").toList().blockingGet();
  assertEquals("zero", values.get(0));
  assertEquals("two", values.get(2));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testConcatSimple() {
  Flowable<String> f1 = Flowable.just("one", "two");
  Flowable<String> f2 = Flowable.just("three", "four");
  List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
  assertEquals("one", values.get(0));
  assertEquals("two", values.get(1));
  assertEquals("three", values.get(2));
  assertEquals("four", values.get(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance3() {
  Flowable<Movie> f1 = Flowable.just(new HorrorMovie(), new Movie());
  Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
  List<Media> values = Flowable.merge(f1, f2).toList().blockingGet();
  assertTrue(values.get(0) instanceof HorrorMovie);
  assertTrue(values.get(1) instanceof Movie);
  assertTrue(values.get(2) != null);
  assertTrue(values.get(3) instanceof HorrorMovie);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Flowable<Integer> xs = Flowable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void error() {
  Flowable.error(new TestException())
  .toList()
  .toFlowable()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance() {
  Flowable<Media> f1 = Flowable.<Media> just(new HorrorMovie(), new Movie());
  Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
  Flowable<Flowable<Media>> os = Flowable.just(f1, f2);
  List<Media> values = Flowable.merge(os).toList().blockingGet();
  assertEquals(4, values.size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance2() {
  Flowable<Media> f1 = Flowable.just(new HorrorMovie(), new Movie(), new Media());
  Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
  Flowable<Flowable<Media>> os = Flowable.just(f1, f2);
  List<Media> values = Flowable.merge(os).toList().blockingGet();
  assertEquals(5, values.size());
}

相关文章

Flowable类方法