本文整理了Java中io.reactivex.Flowable.ambArray()
方法的一些代码示例,展示了Flowable.ambArray()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.ambArray()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:ambArray
[英]Mirrors the one Publisher in an array of several Publishers that first either emits an item or sends a termination notification.
Backpressure: The operator itself doesn't interfere with backpressure which is determined by the winning Publisher's backpressure behavior. Scheduler: ambArray does not operate by default on a particular Scheduler.
[中]镜像多个发布服务器阵列中首先发出项目或发送终止通知的一个发布服务器。
背压:操作员本身不会干扰背压,这是由获胜出版商的背压行为决定的。调度程序:默认情况下,ambArray不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void ambVarargsNull() {
Flowable.ambArray((Publisher<Object>[])null);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void ambArrayEmpty() {
assertSame(Flowable.empty(), Flowable.ambArray());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.ambArray(
Flowable.fromIterable(iterate(elements)),
Flowable.<Long>never()
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void ambVarargsOneIsNull() {
Flowable.ambArray(Flowable.never(), null).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Mirrors the Publisher (current or provided) that first either emits an item or sends a termination
* notification.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the winning
* {@code Publisher}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* a Publisher competing to react first. A subscription to this provided Publisher will occur after subscribing
* to the current Publisher.
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> ambWith(Publisher<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return ambArray(this, other);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAmb2() {
IOException expectedException = new IOException(
"fake exception");
Flowable<String> flowable1 = createFlowable(new String[] {},
2000, new IOException("fake exception"));
Flowable<String> flowable2 = createFlowable(new String[] {
"2", "22", "222", "2222" }, 1000, expectedException);
Flowable<String> flowable3 = createFlowable(new String[] {},
3000, new IOException("fake exception"));
@SuppressWarnings("unchecked")
Flowable<String> f = Flowable.ambArray(flowable1,
flowable2, flowable3);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
f.subscribe(subscriber);
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, times(1)).onNext("2");
inOrder.verify(subscriber, times(1)).onNext("22");
inOrder.verify(subscriber, times(1)).onNext("222");
inOrder.verify(subscriber, times(1)).onNext("2222");
inOrder.verify(subscriber, times(1)).onError(expectedException);
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void ambArraySingleElement() {
assertSame(Flowable.never(), Flowable.ambArray(Flowable.never()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAmb() {
Flowable<String> flowable1 = createFlowable(new String[] {
"1", "11", "111", "1111" }, 2000, null);
Flowable<String> flowable2 = createFlowable(new String[] {
"2", "22", "222", "2222" }, 1000, null);
Flowable<String> flowable3 = createFlowable(new String[] {
"3", "33", "333", "3333" }, 3000, null);
@SuppressWarnings("unchecked")
Flowable<String> f = Flowable.ambArray(flowable1,
flowable2, flowable3);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
f.subscribe(subscriber);
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, times(1)).onNext("2");
inOrder.verify(subscriber, times(1)).onNext("22");
inOrder.verify(subscriber, times(1)).onNext("222");
inOrder.verify(subscriber, times(1)).onNext("2222");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void disposed() {
TestHelper.checkDisposed(Flowable.ambArray(Flowable.never(), Flowable.never()));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void ambArrayOrder() {
Flowable<Integer> error = Flowable.error(new RuntimeException());
Flowable.ambArray(Flowable.just(1), error).test().assertValue(1).assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAmb3() {
Flowable<String> flowable1 = createFlowable(new String[] {
"1" }, 2000, null);
Flowable<String> flowable2 = createFlowable(new String[] {},
1000, null);
Flowable<String> flowable3 = createFlowable(new String[] {
"3" }, 3000, null);
@SuppressWarnings("unchecked")
Flowable<String> f = Flowable.ambArray(flowable1,
flowable2, flowable3);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
f.subscribe(subscriber);
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noWinnerSuccessDispose() throws Exception {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);
Flowable.ambArray(
Flowable.just(1)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Flowable.never()
)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});
assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noWinnerCompleteDispose() throws Exception {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);
Flowable.ambArray(
Flowable.empty()
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Flowable.never()
)
.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), new Action() {
@Override
public void run() throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});
assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noWinnerErrorDispose() throws Exception {
final TestException ex = new TestException();
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);
Flowable.ambArray(
Flowable.error(ex)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Flowable.never()
)
.subscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});
assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.ambArray(pp1, pp2).test();
Runnable r1 = new Runnable() {
@Override
public void run() {
pp1.onComplete();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
pp2.onComplete();
}
};
TestHelper.race(r1, r2);
ts.assertResult();
}
}
代码示例来源:origin: ReactiveX/RxJava
TestSubscriber<Integer> ts = Flowable.ambArray(pp1, pp2).test();
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
final AtomicLong count = new AtomicLong();
Consumer<Subscription> incrementer = new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
count.incrementAndGet();
}
};
//this aync stream should emit first
Flowable<Integer> f1 = Flowable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Flowable<Integer> f2 = Flowable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.ambArray(f1, f2).subscribe(ts);
ts.request(1);
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
assertEquals(2, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onNextRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.ambArray(pp1, pp2).test();
Runnable r1 = new Runnable() {
@Override
public void run() {
pp1.onNext(1);
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
pp2.onNext(1);
}
};
TestHelper.race(r1, r2);
ts.assertSubscribed().assertNoErrors()
.assertNotComplete().assertValueCount(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
//this aync stream should emit first
Flowable<Integer> f1 = Flowable.fromArray(1, 2, 3)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Flowable<Integer> f2 = Flowable.fromArray(4, 5, 6)
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(1L);
Flowable.ambArray(f1, f2).subscribe(ts);
// before first emission request 20 more
// this request should suffice to emit all
ts.request(20);
//ensure stream does not hang
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testAmbCancelsOthers() {
PublishProcessor<Integer> source1 = PublishProcessor.create();
PublishProcessor<Integer> source2 = PublishProcessor.create();
PublishProcessor<Integer> source3 = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.ambArray(source1, source2, source3).subscribe(ts);
assertTrue("Source 1 doesn't have subscribers!", source1.hasSubscribers());
assertTrue("Source 2 doesn't have subscribers!", source2.hasSubscribers());
assertTrue("Source 3 doesn't have subscribers!", source3.hasSubscribers());
source1.onNext(1);
assertTrue("Source 1 doesn't have subscribers!", source1.hasSubscribers());
assertFalse("Source 2 still has subscribers!", source2.hasSubscribers());
assertFalse("Source 2 still has subscribers!", source3.hasSubscribers());
}
内容来源于网络,如有侵权,请联系作者删除!