io.reactivex.Observable.merge()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(227)

本文整理了Java中io.reactivex.Observable.merge()方法的一些代码示例,展示了Observable.merge()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.merge()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:merge

Observable.merge介绍

[英]Flattens an ObservableSource that emits ObservableSources into a single ObservableSource that emits the items emitted by those ObservableSources, without any transformation.

You can combine the items emitted by multiple ObservableSources so that they appear as a single ObservableSource, by using the merge method. Scheduler: merge does not operate by default on a particular Scheduler. Error handling: If any of the source ObservableSources signal a Throwable via onError, the resulting Observable terminates with that Throwable and all other source ObservableSources are cancelled. If more than one ObservableSource signals an error, the resulting Observable may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException containing two or more of the various error signals. Throwables that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins#onError(Throwable) method as UndeliverableException errors. Similarly, Throwables signaled by source(s) after the returned Observable has been cancelled or terminated with a (composite) error will be sent to the same global error handler. Use #mergeDelayError(ObservableSource) to merge sources and terminate only when all source ObservableSources have completed or failed with an error.
[中]将发射ObservableSource的ObservableSource展平为一个发射这些ObservableSource发射的项目的ObservableSource,而不进行任何转换。
通过使用merge方法,可以将多个observedsource发出的项组合起来,使它们显示为单个observedsource。计划程序:默认情况下,合并不会在特定计划程序上运行。错误处理:如果任何源可观测资源通过OneError发送一个可丢弃的信号,则生成的可观测结果将与该可丢弃终止,所有其他源可观测资源将被取消。如果一个以上的可观察资源发出错误信号,则产生的可观察资源可能会以第一个的错误终止,或者,根据源的并发性,可能会以包含两个或多个不同错误信号的复合异常终止。未进入组合的一次性文件将作为不可交付的异常错误通过RxJavaPlugins#onError(一次性)方法(单独)发送到全局错误处理程序。类似地,在返回的可观察对象被取消或终止(复合)错误后,源发出的丢弃信号将被发送到同一个全局错误处理程序。使用#mergeDelayError(ObservableSource)合并源,并仅在所有源ObservableSource都已完成或失败且出现错误时终止。

代码示例

代码示例来源:origin: ReactiveX/RxJava

private Observable<Integer> mergeNAsyncStreamsOfN(final int outerSize, final int innerSize) {
  Observable<Observable<Integer>> os = Observable.range(1, outerSize)
  .map(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer i) {
      return Observable.range(1, innerSize).subscribeOn(Schedulers.computation());
    }
  });
  return Observable.merge(os);
}

代码示例来源:origin: ReactiveX/RxJava

private Observable<Integer> mergeNSyncStreamsOfN(final int outerSize, final int innerSize) {
  Observable<Observable<Integer>> os = Observable.range(1, outerSize)
  .map(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer i) {
      return Observable.range(1, innerSize);
    }
  });
  return Observable.merge(os);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void mergeIterableNull() {
  Observable.merge((Iterable<Observable<Object>>)null, 128, 128);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * This won't compile if super/extends isn't done correctly on generics.
 */
@Test
public void testCovarianceOfMerge() {
  Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  Observable<Observable<HorrorMovie>> metaHorrors = Observable.just(horrors);
  Observable.<Media> merge(metaHorrors);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void mergeIterableOneIsNull() {
  Observable.merge(Arrays.asList(just1, null), 128, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mergeScalar() {
  Observable.merge(Observable.just(Observable.just(1)))
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void mergeIterableIteratorNull() {
  Observable.merge(new Iterable<Observable<Object>>() {
    @Override
    public Iterator<Observable<Object>> iterator() {
      return null;
    }
  }, 128, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeList() {
  final Observable<String> o1 = Observable.unsafeCreate(new TestSynchronousObservable());
  final Observable<String> o2 = Observable.unsafeCreate(new TestSynchronousObservable());
  List<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();
  listOfObservables.add(o1);
  listOfObservables.add(o2);
  Observable<String> m = Observable.merge(listOfObservables);
  m.subscribe(stringObserver);
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(1)).onComplete();
  verify(stringObserver, times(2)).onNext("hello");
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
  public void testTake() throws Exception {
    List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(3);

    sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
    sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
    sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));

    TestObserver<Integer> to = new TestObserver<Integer>();

    Observable.merge(sourceList, 2).take(5).subscribe(to);

    to.awaitTerminalEvent();
    to.assertNoErrors();
    to.assertValueCount(5);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeArray() {
  final Observable<String> o1 = Observable.unsafeCreate(new TestSynchronousObservable());
  final Observable<String> o2 = Observable.unsafeCreate(new TestSynchronousObservable());
  Observable<String> m = Observable.merge(o1, o2);
  m.subscribe(stringObserver);
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(2)).onNext("hello");
  verify(stringObserver, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mergeScalar2() {
  Observable.merge(Observable.just(Observable.just(1)).hide())
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values not permitted")
public void mergeWithNullValues() {
  System.out.println("mergeWithNullValues");
  TestObserver<String> to = new TestObserver<String>();
  Observable.merge(Observable.just(null, "one"), Observable.just("two", null)).subscribe(to);
  to.assertTerminated();
  to.assertNoErrors();
  to.assertValues(null, "one", "two", null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values are not permitted")
public void mergingNullObservable() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.merge(Observable.just("one"), null).subscribe(to);
  to.assertNoErrors();
  to.assertValue("one");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeArrayWithThreading() {
  final TestASynchronousObservable o1 = new TestASynchronousObservable();
  final TestASynchronousObservable o2 = new TestASynchronousObservable();
  Observable<String> m = Observable.merge(Observable.unsafeCreate(o1), Observable.unsafeCreate(o2));
  TestObserver<String> to = new TestObserver<String>(stringObserver);
  m.subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(2)).onNext("hello");
  verify(stringObserver, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance() {
  Observable<Media> o1 = Observable.<Media> just(new HorrorMovie(), new Movie());
  Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
  Observable<Observable<Media>> os = Observable.just(o1, o2);
  List<Media> values = Observable.merge(os).toList().blockingGet();
  assertEquals(4, values.size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance3() {
  Observable<Movie> o1 = Observable.just(new HorrorMovie(), new Movie());
  Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
  List<Media> values = Observable.merge(o1, o2).toList().blockingGet();
  assertTrue(values.get(0) instanceof HorrorMovie);
  assertTrue(values.get(1) instanceof Movie);
  assertTrue(values.get(2) != null);
  assertTrue(values.get(3) instanceof HorrorMovie);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mergeScalarEmpty() {
  Observable.merge(Observable.just(Observable.empty()).hide())
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mergeScalarError() {
  Observable.merge(Observable.just(Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })).hide())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance2() {
  Observable<Media> o1 = Observable.just(new HorrorMovie(), new Movie(), new Media());
  Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
  Observable<Observable<Media>> os = Observable.just(o1, o2);
  List<Media> values = Observable.merge(os).toList().blockingGet();
  assertEquals(5, values.size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void debounceWithTimeBackpressure() throws InterruptedException {
  TestScheduler scheduler = new TestScheduler();
  TestObserver<Integer> observer = new TestObserver<Integer>();
  Observable.merge(
      Observable.just(1),
      Observable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
  ).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(observer);
  scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
  observer.assertValue(2);
  observer.assertTerminated();
  observer.assertNoErrors();
}

相关文章

Observable类方法