本文整理了Java中io.reactivex.Observable.toList()
方法的一些代码示例,展示了Observable.toList()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.toList()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:toList
[英]Returns a Single that emits a single item, a list composed of all the items emitted by the finite source ObservableSource.
Normally, an ObservableSource that returns multiple items will do so by invoking its Observer's Observer#onNext method for each such item. You can change this behavior, instructing the ObservableSource to compose a list of all of these items and then to invoke the Observer's onNextfunction once, passing it the entire list, by calling the ObservableSource's toList method prior to calling its #subscribe method.
Note that this operator requires the upstream to signal onComplete for the accumulated list to be emitted. Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError. Scheduler: toList does not operate by default on a particular Scheduler.
[中]返回一个发出单个项的列表,该列表由有限源ObservableSource发出的所有项组成。
通常情况下,返回多个项目的ObservateSource将通过为每个此类项目调用其Observator的Observator#onNext方法来实现。您可以改变这种行为,在调用ObservateSource的#subscribe方法之前,通过调用ObservateSource的toList方法,指示ObservateSource组成一个包含所有这些项的列表,然后调用ObservateSource的onNextfunction一次,将整个列表传递给它。
请注意,该运算符要求上游发出“完成”信号,以发出累积列表。无限且永远不完整的源永远不会通过该运算符发出任何信息,而无限源可能会导致致命的OutOfMemory错误。Scheduler:toList默认情况下不会在特定的计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Single<List<Object>> apply(Observable<Object> f)
throws Exception {
return f.toList();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<List<Object>> apply(Observable<Object> f)
throws Exception {
return f.toList().toObservable();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testList() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Single<List<String>> single = w.toList();
SingleObserver<List<String>> observer = TestHelper.mockSingleObserver();
single.subscribe(observer);
verify(observer, times(1)).onSuccess(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testListViaObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Single<List<String>> single = w.toList();
SingleObserver<List<String>> observer = TestHelper.mockSingleObserver();
single.subscribe(observer);
verify(observer, times(1)).onSuccess(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testToFutureList() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.just("one", "two", "three");
Future<List<String>> f = obs.toList().toFuture();
assertEquals("one", f.get().get(0));
assertEquals("two", f.get().get(1));
assertEquals("three", f.get().get(2));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values are not allowed")
public void testListWithNullValue() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", null, "three"));
Single<List<String>> single = w.toList();
SingleObserver<List<String>> observer = TestHelper.mockSingleObserver();
single.subscribe(observer);
verify(observer, times(1)).onSuccess(Arrays.asList("one", null, "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void toListSupplierReturnsNull() {
just1.toList(new Callable<Collection<Integer>>() {
@Override
public Collection<Integer> call() {
return null;
}
}).blockingGet();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values are not allowed")
public void testListWithNullValueObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", null, "three"));
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", null, "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void startWithIterable() {
List<String> li = new ArrayList<String>();
li.add("alpha");
li.add("beta");
List<String> values = Observable.just("one", "two").startWith(li).toList().blockingGet();
assertEquals("alpha", values.get(0));
assertEquals("beta", values.get(1));
assertEquals("one", values.get(2));
assertEquals("two", values.get(3));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
Observable<Notification<String>> m = Observable.unsafeCreate(o).materialize();
assertEquals(3, m.toList().toFuture().get().size());
assertEquals(3, m.toList().toFuture().get().size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testListWithBlockingFirst() {
Observable<String> o = Observable.fromIterable(Arrays.asList("one", "two", "three"));
List<String> actual = o.toList().blockingGet();
Assert.assertEquals(Arrays.asList("one", "two", "three"), actual);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testConcatSimple() {
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testListWithBlockingFirstObservable() {
Observable<String> o = Observable.fromIterable(Arrays.asList("one", "two", "three"));
List<String> actual = o.toList().toObservable().blockingFirst();
Assert.assertEquals(Arrays.asList("one", "two", "three"), actual);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatTake() {
Observable<Integer> xs = Observable.just(1, 2);
Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance() {
Observable<Media> o1 = Observable.<Media> just(new HorrorMovie(), new Movie());
Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
Observable<Observable<Media>> os = Observable.just(o1, o2);
List<Media> values = Observable.merge(os).toList().blockingGet();
assertEquals(4, values.size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance3() {
Observable<Movie> o1 = Observable.just(new HorrorMovie(), new Movie());
Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
List<Media> values = Observable.merge(o1, o2).toList().blockingGet();
assertTrue(values.get(0) instanceof HorrorMovie);
assertTrue(values.get(1) instanceof Movie);
assertTrue(values.get(2) != null);
assertTrue(values.get(3) instanceof HorrorMovie);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance2() {
Observable<Media> o1 = Observable.just(new HorrorMovie(), new Movie(), new Media());
Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
Observable<Observable<Media>> os = Observable.just(o1, o2);
List<Media> values = Observable.merge(os).toList().blockingGet();
assertEquals(5, values.size());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void capacityHintObservable() {
Observable.range(1, 10)
.toList(4)
.toObservable()
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorSingle() {
Observable.error(new TestException())
.toList()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void error() {
Observable.error(new TestException())
.toList()
.toObservable()
.test()
.assertFailure(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!