io.reactivex.Observable.concat()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(220)

本文整理了Java中io.reactivex.Observable.concat()方法的一些代码示例,展示了Observable.concat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concat()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concat

Observable.concat介绍

[英]Returns an Observable that emits the items emitted by each of the ObservableSources emitted by the source ObservableSource, one after the other, without interleaving them.

Scheduler: concat does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源ObservableSource发出的每个ObservableSource发出的项,一个接一个地发出,而不交错它们。
调度器:默认情况下,concat不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void concatObservableNull() {
  3. Observable.concat((Observable<Observable<Object>>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void concatIterableNull() {
  3. Observable.concat((Iterable<Observable<Object>>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test(expected = NullPointerException.class)
  3. public void concatIterableOneIsNull() {
  4. Observable.concat(Arrays.asList(just1, null)).blockingLast();
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concat4() {
  3. Observable.concat(Observable.just(1), Observable.just(2),
  4. Observable.just(3), Observable.just(4))
  5. .test()
  6. .assertResult(1, 2, 3, 4);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concat3() {
  3. Observable.concat(Observable.just(1), Observable.just(2), Observable.just(3))
  4. .test()
  5. .assertResult(1, 2, 3);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void concatIterableIteratorNull() {
  3. Observable.concat(new Iterable<Observable<Object>>() {
  4. @Override
  5. public Iterator<Observable<Object>> iterator() {
  6. return null;
  7. }
  8. }).blockingLast();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSkipError() {
  3. Exception e = new Exception();
  4. Observable<String> ok = Observable.just("one");
  5. Observable<String> error = Observable.error(e);
  6. Observable<String> skip = Observable.concat(ok, error).skip(100);
  7. Observer<String> observer = TestHelper.mockObserver();
  8. skip.subscribe(observer);
  9. verify(observer, never()).onNext(any(String.class));
  10. verify(observer, times(1)).onError(e);
  11. verify(observer, never()).onComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
  3. TestScheduler testScheduler = new TestScheduler();
  4. Observable<Integer> source = Observable.concat(Observable.<Integer> error(new TestException()), Observable.just(1));
  5. Observer<Integer> o = TestHelper.mockObserver();
  6. InOrder inOrder = inOrder(o);
  7. source.observeOn(testScheduler).subscribe(o);
  8. inOrder.verify(o, never()).onError(any(TestException.class));
  9. testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  10. inOrder.verify(o).onError(any(TestException.class));
  11. inOrder.verify(o, never()).onNext(anyInt());
  12. inOrder.verify(o, never()).onComplete();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void workerNotDisposedPrematurelySyncInNormalOut() {
  3. DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  4. Observable.concat(
  5. Observable.just(1).observeOn(s),
  6. Observable.just(2)
  7. )
  8. .test()
  9. .assertResult(1, 2);
  10. assertEquals(1, s.disposedCount.get());
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testConcat() {
  3. Observer<String> observer = TestHelper.mockObserver();
  4. final String[] o = { "1", "3", "5", "7" };
  5. final String[] e = { "2", "4", "6" };
  6. final Observable<String> odds = Observable.fromArray(o);
  7. final Observable<String> even = Observable.fromArray(e);
  8. Observable<String> concat = Observable.concat(odds, even);
  9. concat.subscribe(observer);
  10. verify(observer, times(7)).onNext(anyString());
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testAggregateAsIntSumSourceThrows() {
  3. Single<Integer> result = Observable.concat(Observable.just(1, 2, 3, 4, 5),
  4. Observable.<Integer> error(new TestException()))
  5. .reduce(0, sum).map(new Function<Integer, Integer>() {
  6. @Override
  7. public Integer apply(Integer v) {
  8. return v;
  9. }
  10. });
  11. result.subscribe(singleObserver);
  12. verify(singleObserver, never()).onSuccess(any());
  13. verify(singleObserver, times(1)).onError(any(TestException.class));
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testConcatSimple() {
  3. Observable<String> o1 = Observable.just("one", "two");
  4. Observable<String> o2 = Observable.just("three", "four");
  5. List<String> values = Observable.concat(o1, o2).toList().blockingGet();
  6. assertEquals("one", values.get(0));
  7. assertEquals("two", values.get(1));
  8. assertEquals("three", values.get(2));
  9. assertEquals("four", values.get(3));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithError3() {
  3. Single<Boolean> o = Observable.sequenceEqual(
  4. Observable.concat(Observable.just("one"),
  5. Observable.<String> error(new TestException())),
  6. Observable.concat(Observable.just("one"),
  7. Observable.<String> error(new TestException())));
  8. verifyError(o);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void workerNotDisposedPrematurelyNormalInNormalOut() {
  3. DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  4. Observable.concat(
  5. Observable.just(1).hide().observeOn(s),
  6. Observable.just(2)
  7. )
  8. .test()
  9. .assertResult(1, 2);
  10. assertEquals(1, s.disposedCount.get());
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithError2() {
  3. Single<Boolean> o = Observable.sequenceEqual(
  4. Observable.just("one", "two", "three"),
  5. Observable.concat(Observable.just("one"),
  6. Observable.<String> error(new TestException())));
  7. verifyError(o);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithError3Observable() {
  3. Observable<Boolean> o = Observable.sequenceEqual(
  4. Observable.concat(Observable.just("one"),
  5. Observable.<String> error(new TestException())),
  6. Observable.concat(Observable.just("one"),
  7. Observable.<String> error(new TestException()))).toObservable();
  8. verifyError(o);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithError1() {
  3. Single<Boolean> o = Observable.sequenceEqual(
  4. Observable.concat(Observable.just("one"),
  5. Observable.<String> error(new TestException())),
  6. Observable.just("one", "two", "three"));
  7. verifyError(o);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithError1Observable() {
  3. Observable<Boolean> o = Observable.sequenceEqual(
  4. Observable.concat(Observable.just("one"),
  5. Observable.<String> error(new TestException())),
  6. Observable.just("one", "two", "three")).toObservable();
  7. verifyError(o);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithError2Observable() {
  3. Observable<Boolean> o = Observable.sequenceEqual(
  4. Observable.just("one", "two", "three"),
  5. Observable.concat(Observable.just("one"),
  6. Observable.<String> error(new TestException()))).toObservable();
  7. verifyError(o);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testError2() {
  3. Observable<Integer> source = Observable.concat(Observable.just(0),
  4. Observable.<Integer> error(new TestException("Forced failure")));
  5. Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  6. TestObserver<Object> to = new TestObserver<Object>();
  7. m.subscribe(to);
  8. to.awaitTerminalEvent();
  9. assertEquals(1, to.errorCount());
  10. to.assertValueCount(1);
  11. }

相关文章

Observable类方法