本文整理了Java中io.reactivex.Flowable.merge()
方法的一些代码示例,展示了Flowable.merge()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.merge()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:merge
[英]Flattens an Iterable of Publishers into one Publisher, without any transformation.
You can combine the items emitted by multiple Publishers so that they appear as a single Publisher, by using the merge method. Backpressure: The operator honors backpressure from downstream. The source Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: merge does not operate by default on a particular Scheduler. Error handling: If any of the source Publishers signal a Throwable via onError, the resulting Flowable terminates with that Throwable and all other source Publishers are canceled. If more than one Publisher signals an error, the resulting Flowable may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException containing two or more of the various error signals. Throwables that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins#onError(Throwable) method as UndeliverableException errors. Similarly, Throwables signaled by source(s) after the returned Flowable has been canceled or terminated with a (composite) error will be sent to the same global error handler. Use #mergeDelayError(Iterable) to merge sources and terminate only when all source Publishers have completed or failed with an error.
[中]将多个发布服务器扁平化为一个发布服务器,无需任何转换。
通过使用合并方法,可以合并多个发布服务器发出的项目,使它们显示为单个发布服务器。背压:操作员接受来自下游的背压。希望源出版商尊重背压;如果违反,操作员可能发出信号缺失背压异常。计划程序:默认情况下,合并不会在特定计划程序上运行。错误处理:如果任何源发布服务器通过OneError发出可丢弃的信号,则生成的Flowable将终止该可丢弃,并且所有其他源发布服务器将被取消。如果一个以上的发布者发出错误信号,则生成的Flowable可能会以第一个发布者的错误终止,或者根据源的并发性,可能会以包含两个或多个不同错误信号的CompositeException终止。未进入组合的一次性文件将作为无法交付的异常错误通过RxJavaPlugins#onError(一次性)方法(单独)发送到全局错误处理程序。类似地,在返回的Flowable被取消或终止(复合)错误后,源发出的丢弃信号将被发送到同一全局错误处理程序。使用#mergeDelayError(Iterable)合并源,并仅在所有源发布程序都已完成或失败并出现错误时终止。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Completable apply(Flowable<Flowable<Completable>> t) {
return Completable.merge(Flowable.merge(t, 10));
}
});
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void mergeIterableOneIsNull() {
Flowable.merge(Arrays.asList(just1, null), 128, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void mergeIterableIteratorNull() {
Flowable.merge(new Iterable<Publisher<Object>>() {
@Override
public Iterator<Publisher<Object>> iterator() {
return null;
}
}, 128, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
private Flowable<Integer> mergeNSyncStreamsOfN(final int outerSize, final int innerSize) {
Flowable<Flowable<Integer>> os = Flowable.range(1, outerSize)
.map(new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer i) {
return Flowable.range(1, innerSize);
}
});
return Flowable.merge(os);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void negativeMaxConcurrent() {
try {
Flowable.merge(Arrays.asList(Flowable.just(1), Flowable.just(2)), -1);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("maxConcurrency > 0 required but it was -1", e.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void zeroMaxConcurrent() {
try {
Flowable.merge(Arrays.asList(Flowable.just(1), Flowable.just(2)), 0);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("maxConcurrency > 0 required but it was 0", e.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.merge(Arrays.asList(
Flowable.fromIterable(iterate(elements / 2)),
Flowable.fromIterable(iterate(elements - elements / 2))
)
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
/**
* This won't compile if super/extends isn't done correctly on generics.
*/
@Test
public void testCovarianceOfMerge() {
Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
Flowable<Flowable<HorrorMovie>> metaHorrors = Flowable.just(horrors);
Flowable.<Media> merge(metaHorrors);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance3() {
Flowable<Movie> f1 = Flowable.just(new HorrorMovie(), new Movie());
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
List<Media> values = Flowable.merge(f1, f2).toList().blockingGet();
assertTrue(values.get(0) instanceof HorrorMovie);
assertTrue(values.get(1) instanceof Movie);
assertTrue(values.get(2) != null);
assertTrue(values.get(3) instanceof HorrorMovie);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeScalar() {
Flowable.merge(Flowable.just(Flowable.just(1)))
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeScalar2() {
Flowable.merge(Flowable.just(Flowable.just(1)).hide())
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeScalarError() {
Flowable.merge(Flowable.just(Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new TestException();
}
})).hide())
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values not permitted")
public void mergeWithNullValues() {
System.out.println("mergeWithNullValues");
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.merge(Flowable.just(null, "one"), Flowable.just("two", null)).subscribe(ts);
ts.assertTerminated();
ts.assertNoErrors();
ts.assertValues(null, "one", "two", null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values are not permitted")
public void mergingNullFlowable() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Flowable.merge(Flowable.just("one"), null).subscribe(ts);
ts.assertNoErrors();
ts.assertValue("one");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance() {
Flowable<Media> f1 = Flowable.<Media> just(new HorrorMovie(), new Movie());
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
Flowable<Flowable<Media>> os = Flowable.just(f1, f2);
List<Media> values = Flowable.merge(os).toList().blockingGet();
assertEquals(4, values.size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldNotCompleteWhileThereAreStillScalarSynchronousEmissionsInTheQueue() {
Flowable<Integer> source = Flowable.merge(Flowable.just(1), Flowable.just(2));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(1L);
source.subscribe(subscriber);
subscriber.assertValue(1);
subscriber.request(1);
subscriber.assertValues(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeConcurrentJustJust() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.merge(Flowable.just(Flowable.just(1)), 5).subscribe(ts);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeConcurrentJustRange() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.merge(Flowable.just(Flowable.range(1, 5)), 5).subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance2() {
Flowable<Media> f1 = Flowable.just(new HorrorMovie(), new Movie(), new Media());
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
Flowable<Flowable<Media>> os = Flowable.just(f1, f2);
List<Media> values = Flowable.merge(os).toList().blockingGet();
assertEquals(5, values.size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testRaceConditions() {
Scheduler comp = Schedulers.computation();
Scheduler limited = comp.when(new Function<Flowable<Flowable<Completable>>, Completable>() {
@Override
public Completable apply(Flowable<Flowable<Completable>> t) {
return Completable.merge(Flowable.merge(t, 10));
}
});
merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe();
}
内容来源于网络,如有侵权,请联系作者删除!