本文整理了Java中io.reactivex.Flowable.mergeDelayError()
方法的一些代码示例,展示了Flowable.mergeDelayError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.mergeDelayError()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:mergeDelayError
[英]Flattens an Iterable of Publishers into one Publisher, in a way that allows a Subscriber to receive all successfully emitted items from each of the source Publishers without being interrupted by an error notification from one of them.
This behaves like #merge(Publisher) except that if any of the merged Publishers notify of an error via Subscriber#onError, mergeDelayError will refrain from propagating that error notification until all of the merged Publishers have finished emitting items.
Even if multiple merged Publishers send onError notifications, mergeDelayError will only invoke the onError method of its Subscribers once. Backpressure: The operator honors backpressure from downstream. All inner Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: mergeDelayError does not operate by default on a particular Scheduler.
[中]将多个发布服务器扁平化为一个发布服务器,允许订阅服务器接收每个源发布服务器成功发出的所有项目,而不会被其中一个发布服务器发出的错误通知中断。
其行为类似于#merge(Publisher),不同的是,如果任何合并的发布服务器通过订阅者#onError通知错误,则mergeDelayError将在所有合并的发布服务器完成发送项目之前避免传播该错误通知。
即使多个合并发布服务器发送OneError通知,mergeDelayError也只会调用其订阅服务器的OneError方法一次。背压:操作员接受来自下游的背压。所有内部出版商都应尊重背压;如果违反,操作员可能发出信号缺失背压异常。调度程序:默认情况下,mergeDelayError不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void mergeDelayErrorIterableNull() {
Flowable.mergeDelayError((Iterable<Publisher<Object>>)null, 128, 128);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void mergeDelayErrorIterableOneIsNull() {
Flowable.mergeDelayError(Arrays.asList(just1, null), 128, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void mergeDelayErrorIterableIteratorNull() {
Flowable.mergeDelayError(new Iterable<Publisher<Object>>() {
@Override
public Iterator<Publisher<Object>> iterator() {
return null;
}
}, 128, 128).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeArray() {
final Flowable<String> f1 = Flowable.unsafeCreate(new TestSynchronousFlowable());
final Flowable<String> f2 = Flowable.unsafeCreate(new TestSynchronousFlowable());
Flowable<String> m = Flowable.mergeDelayError(f1, f2);
m.subscribe(stringSubscriber);
verify(stringSubscriber, never()).onError(any(Throwable.class));
verify(stringSubscriber, times(2)).onNext("hello");
verify(stringSubscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void mergeIterableDelayError() {
Flowable.mergeDelayError(Arrays.asList(Flowable.just(1), Flowable.just(2)))
.test()
.assertResult(1, 2);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void mergeIterableDelayErrorMaxConcurrency() {
Flowable.mergeDelayError(
Arrays.asList(Flowable.just(1),
Flowable.just(2)), 1)
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayedErrorsShouldBeEmittedWhenCompleteAfterApplyingBackpressure_NormalPath() {
Throwable exception = new Throwable();
Flowable<Integer> source = Flowable.mergeDelayError(Flowable.range(1, 2), Flowable.<Integer>error(exception));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(0L);
source.subscribe(subscriber);
subscriber.request(3); // 1, 2, <error>
subscriber.assertValues(1, 2);
subscriber.assertTerminated();
assertEquals(asList(exception), subscriber.errors());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void mergeIterableDelayErrorWithError() {
Flowable.mergeDelayError(
Arrays.asList(Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException())),
Flowable.just(2)))
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void mergeIterableDelayErrorWithErrorMaxConcurrency() {
Flowable.mergeDelayError(
Arrays.asList(Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException())),
Flowable.just(2)), 1)
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayedErrorsShouldBeEmittedWhenCompleteAfterApplyingBackpressure_FastPath() {
Throwable exception = new Throwable();
Flowable<Integer> source = Flowable.mergeDelayError(Flowable.just(1), Flowable.<Integer>error(exception));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(0L);
source.subscribe(subscriber);
subscriber.request(2); // 1, <error>
subscriber.assertValue(1);
subscriber.assertTerminated();
assertEquals(asList(exception), subscriber.errors());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeDelayErrorMaxConcurrency() {
Flowable.mergeDelayError(
Flowable.just(Flowable.just(1),
Flowable.just(2)), 1)
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeDelayError3() {
Flowable.mergeDelayError(
Flowable.just(1),
Flowable.just(2),
Flowable.just(3)
)
.test()
.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldNotReceivedDelayedErrorWhileThereAreStillNormalEmissionsInTheQueue() {
Throwable exception = new Throwable();
Flowable<Integer> source = Flowable.mergeDelayError(Flowable.range(1, 2), Flowable.range(3, 2), Flowable.<Integer>error(exception));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(0L);
subscriber.request(3);
source.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
assertEquals(Collections.<Throwable>emptyList(), subscriber.errors());
subscriber.request(2);
subscriber.assertValues(1, 2, 3, 4);
assertEquals(asList(exception), subscriber.errors());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeDelayError() {
Flowable.mergeDelayError(
Flowable.just(Flowable.just(1),
Flowable.just(2)))
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldCompleteAfterApplyingBackpressure_FastPath() {
Flowable<Integer> source = Flowable.mergeDelayError(Flowable.just(Flowable.just(1)));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(0L);
source.subscribe(subscriber);
subscriber.request(2); // 1, <complete> - should work as per .._NormalPath above
subscriber.assertValue(1);
subscriber.assertTerminated();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldCompleteAfterApplyingBackpressure_NormalPath() {
Flowable<Integer> source = Flowable.mergeDelayError(Flowable.just(Flowable.range(1, 2)));
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(0L);
source.subscribe(subscriber);
subscriber.request(3); // 1, 2, <complete> - with request(2) we get the 1 and 2 but not the <complete>
subscriber.assertValues(1, 2);
subscriber.assertTerminated();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorInParentFlowable() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.mergeDelayError(
Flowable.just(Flowable.just(1), Flowable.just(2))
.startWith(Flowable.<Integer> error(new RuntimeException()))
).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertTerminated();
ts.assertValues(1, 2);
assertEquals(1, ts.errorCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeDelayErrorWithError() {
Flowable.mergeDelayError(
Flowable.just(Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException())),
Flowable.just(2)))
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeDelayErrorWithErrorMaxConcurrency() {
Flowable.mergeDelayError(
Flowable.just(Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException())),
Flowable.just(2)), 1)
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeDelayError3WithError() {
Flowable.mergeDelayError(
Flowable.just(1),
Flowable.just(2).concatWith(Flowable.<Integer>error(new TestException())),
Flowable.just(3)
)
.test()
.assertFailure(TestException.class, 1, 2, 3);
}
内容来源于网络,如有侵权,请联系作者删除!