[英]Flattens an Iterable of Publishers into one Publisher, without any transformation.
You can combine the items emitted by multiple Publishers so that they appear as a single Publisher, by using the merge method. Backpressure: The operator honors backpressure from downstream. The source Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: merge does not operate by default on a particular Scheduler. Error handling: If any of the source Publishers signal a Throwable via onError, the resulting Flowable terminates with that Throwable and all other source Publishers are canceled. If more than one Publisher signals an error, the resulting Flowable may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException containing two or more of the various error signals. Throwables that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins#onError(Throwable) method as UndeliverableException errors. Similarly, Throwables signaled by source(s) after the returned Flowable has been canceled or terminated with a (composite) error will be sent to the same global error handler. Use #mergeDelayError(Iterable) to merge sources and terminate only when all source Publishers have completed or failed with an error.
代码示例来源:origin: ReactiveX/RxJava
public Completable apply(Flowable<Flowable<Completable>> t) {
return Completable.merge(Flowable.merge(t, 10));
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void mergeIterableOneIsNull() {
Flowable.merge(Arrays.asList(just1, null), 128, 128).blockingLast();
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void mergeIterableIteratorNull() {
Flowable.merge(new Iterable<Publisher<Object>>() {
public Iterator<Publisher<Object>> iterator() {
return null;
}, 128, 128).blockingLast();
代码示例来源:origin: ReactiveX/RxJava
private Flowable<Integer> mergeNSyncStreamsOfN(final int outerSize, final int innerSize) {
Flowable<Flowable<Integer>> os = Flowable.range(1, outerSize)
.map(new Function<Integer, Flowable<Integer>>() {
public Flowable<Integer> apply(Integer i) {
return Flowable.range(1, innerSize);
return Flowable.merge(os);
代码示例来源:origin: ReactiveX/RxJava
public void negativeMaxConcurrent() {
try {
Flowable.merge(Arrays.asList(Flowable.just(1), Flowable.just(2)), -1);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("maxConcurrency > 0 required but it was -1", e.getMessage());
代码示例来源:origin: ReactiveX/RxJava
public void zeroMaxConcurrent() {
try {
Flowable.merge(Arrays.asList(Flowable.just(1), Flowable.just(2)), 0);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("maxConcurrency > 0 required but it was 0", e.getMessage());
代码示例来源:origin: ReactiveX/RxJava
public Publisher<Long> createPublisher(long elements) {
Flowable.fromIterable(iterate(elements / 2)),
Flowable.fromIterable(iterate(elements - elements / 2))
代码示例来源:origin: ReactiveX/RxJava
* This won't compile if super/extends isn't done correctly on generics.
public void testCovarianceOfMerge() {
Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
Flowable<Flowable<HorrorMovie>> metaHorrors = Flowable.just(horrors);
Flowable.<Media> merge(metaHorrors);
代码示例来源:origin: ReactiveX/RxJava
public void testMergeCovariance3() {
Flowable<Movie> f1 = Flowable.just(new HorrorMovie(), new Movie());
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
List<Media> values = Flowable.merge(f1, f2).toList().blockingGet();
assertTrue(values.get(0) instanceof HorrorMovie);
assertTrue(values.get(1) instanceof Movie);
assertTrue(values.get(2) != null);
assertTrue(values.get(3) instanceof HorrorMovie);
代码示例来源:origin: ReactiveX/RxJava
public void mergeScalar() {
代码示例来源:origin: ReactiveX/RxJava
public void mergeScalar2() {
代码示例来源:origin: ReactiveX/RxJava
public void mergeScalarError() {
Flowable.merge(Flowable.just(Flowable.fromCallable(new Callable<Object>() {
public Object call() throws Exception {
throw new TestException();
代码示例来源:origin: ReactiveX/RxJava
@Ignore("Null values not permitted")
public void mergeWithNullValues() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.merge(Flowable.just(null, "one"), Flowable.just("two", null)).subscribe(ts);
ts.assertValues(null, "one", "two", null);
代码示例来源:origin: ReactiveX/RxJava
@Ignore("Null values are not permitted")
public void mergingNullFlowable() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.merge(Flowable.just("one"), null).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
public void testMergeCovariance() {
Flowable<Media> f1 = Flowable.<Media> just(new HorrorMovie(), new Movie());
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
Flowable<Flowable<Media>> os = Flowable.just(f1, f2);
List<Media> values = Flowable.merge(os).toList().blockingGet();
assertEquals(4, values.size());
代码示例来源:origin: ReactiveX/RxJava
public void shouldNotCompleteWhileThereAreStillScalarSynchronousEmissionsInTheQueue() {
Flowable<Integer> source = Flowable.merge(Flowable.just(1), Flowable.just(2));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(1L);
subscriber.assertValues(1, 2);
代码示例来源:origin: ReactiveX/RxJava
public void mergeConcurrentJustJust() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.merge(Flowable.just(Flowable.just(1)), 5).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
public void mergeConcurrentJustRange() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.merge(Flowable.just(Flowable.range(1, 5)), 5).subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5);
代码示例来源:origin: ReactiveX/RxJava
public void testMergeCovariance2() {
Flowable<Media> f1 = Flowable.just(new HorrorMovie(), new Movie(), new Media());
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
Flowable<Flowable<Media>> os = Flowable.just(f1, f2);
List<Media> values = Flowable.merge(os).toList().blockingGet();
assertEquals(5, values.size());
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testRaceConditions() {
Scheduler comp = Schedulers.computation();
Scheduler limited = comp.when(new Function<Flowable<Flowable<Completable>>, Completable>() {
public Completable apply(Flowable<Flowable<Completable>> t) {
return Completable.merge(Flowable.merge(t, 10));