io.reactivex.Flowable.ambArray()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(179)

本文整理了Java中io.reactivex.Flowable.ambArray()方法的一些代码示例,展示了Flowable.ambArray()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.ambArray()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:ambArray

Flowable.ambArray介绍

[英]Mirrors the one Publisher in an array of several Publishers that first either emits an item or sends a termination notification.

Backpressure: The operator itself doesn't interfere with backpressure which is determined by the winning Publisher's backpressure behavior. Scheduler: ambArray does not operate by default on a particular Scheduler.
[中]镜像多个发布服务器阵列中首先发出项目或发送终止通知的一个发布服务器。
背压:操作员本身不会干扰背压,这是由获胜出版商的背压行为决定的。调度程序:默认情况下,ambArray不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void ambVarargsNull() {
  3. Flowable.ambArray((Publisher<Object>[])null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void ambArrayEmpty() {
  4. assertSame(Flowable.empty(), Flowable.ambArray());
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Override
  3. public Publisher<Long> createPublisher(long elements) {
  4. return
  5. Flowable.ambArray(
  6. Flowable.fromIterable(iterate(elements)),
  7. Flowable.<Long>never()
  8. )
  9. ;
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test(expected = NullPointerException.class)
  3. public void ambVarargsOneIsNull() {
  4. Flowable.ambArray(Flowable.never(), null).blockingLast();
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Mirrors the Publisher (current or provided) that first either emits an item or sends a termination
  3. * notification.
  4. * <p>
  5. * <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png" alt="">
  6. * <dl>
  7. * <dt><b>Backpressure:</b></dt>
  8. * <dd>The operator itself doesn't interfere with backpressure which is determined by the winning
  9. * {@code Publisher}'s backpressure behavior.</dd>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. *
  14. * @param other
  15. * a Publisher competing to react first. A subscription to this provided Publisher will occur after subscribing
  16. * to the current Publisher.
  17. * @return a Flowable that emits the same sequence as whichever of the source Publishers first
  18. * emitted an item or sent a termination notification
  19. * @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
  20. */
  21. @SuppressWarnings("unchecked")
  22. @CheckReturnValue
  23. @NonNull
  24. @BackpressureSupport(BackpressureKind.FULL)
  25. @SchedulerSupport(SchedulerSupport.NONE)
  26. public final Flowable<T> ambWith(Publisher<? extends T> other) {
  27. ObjectHelper.requireNonNull(other, "other is null");
  28. return ambArray(this, other);
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testAmb2() {
  3. IOException expectedException = new IOException(
  4. "fake exception");
  5. Flowable<String> flowable1 = createFlowable(new String[] {},
  6. 2000, new IOException("fake exception"));
  7. Flowable<String> flowable2 = createFlowable(new String[] {
  8. "2", "22", "222", "2222" }, 1000, expectedException);
  9. Flowable<String> flowable3 = createFlowable(new String[] {},
  10. 3000, new IOException("fake exception"));
  11. @SuppressWarnings("unchecked")
  12. Flowable<String> f = Flowable.ambArray(flowable1,
  13. flowable2, flowable3);
  14. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  15. f.subscribe(subscriber);
  16. scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
  17. InOrder inOrder = inOrder(subscriber);
  18. inOrder.verify(subscriber, times(1)).onNext("2");
  19. inOrder.verify(subscriber, times(1)).onNext("22");
  20. inOrder.verify(subscriber, times(1)).onNext("222");
  21. inOrder.verify(subscriber, times(1)).onNext("2222");
  22. inOrder.verify(subscriber, times(1)).onError(expectedException);
  23. inOrder.verifyNoMoreInteractions();
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void ambArraySingleElement() {
  4. assertSame(Flowable.never(), Flowable.ambArray(Flowable.never()));
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testAmb() {
  3. Flowable<String> flowable1 = createFlowable(new String[] {
  4. "1", "11", "111", "1111" }, 2000, null);
  5. Flowable<String> flowable2 = createFlowable(new String[] {
  6. "2", "22", "222", "2222" }, 1000, null);
  7. Flowable<String> flowable3 = createFlowable(new String[] {
  8. "3", "33", "333", "3333" }, 3000, null);
  9. @SuppressWarnings("unchecked")
  10. Flowable<String> f = Flowable.ambArray(flowable1,
  11. flowable2, flowable3);
  12. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  13. f.subscribe(subscriber);
  14. scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
  15. InOrder inOrder = inOrder(subscriber);
  16. inOrder.verify(subscriber, times(1)).onNext("2");
  17. inOrder.verify(subscriber, times(1)).onNext("22");
  18. inOrder.verify(subscriber, times(1)).onNext("222");
  19. inOrder.verify(subscriber, times(1)).onNext("2222");
  20. inOrder.verify(subscriber, times(1)).onComplete();
  21. inOrder.verifyNoMoreInteractions();
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void disposed() {
  4. TestHelper.checkDisposed(Flowable.ambArray(Flowable.never(), Flowable.never()));
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void ambArrayOrder() {
  4. Flowable<Integer> error = Flowable.error(new RuntimeException());
  5. Flowable.ambArray(Flowable.just(1), error).test().assertValue(1).assertComplete();
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testAmb3() {
  3. Flowable<String> flowable1 = createFlowable(new String[] {
  4. "1" }, 2000, null);
  5. Flowable<String> flowable2 = createFlowable(new String[] {},
  6. 1000, null);
  7. Flowable<String> flowable3 = createFlowable(new String[] {
  8. "3" }, 3000, null);
  9. @SuppressWarnings("unchecked")
  10. Flowable<String> f = Flowable.ambArray(flowable1,
  11. flowable2, flowable3);
  12. Subscriber<String> subscriber = TestHelper.mockSubscriber();
  13. f.subscribe(subscriber);
  14. scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
  15. InOrder inOrder = inOrder(subscriber);
  16. inOrder.verify(subscriber, times(1)).onComplete();
  17. inOrder.verifyNoMoreInteractions();
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noWinnerSuccessDispose() throws Exception {
  4. for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
  5. final AtomicBoolean interrupted = new AtomicBoolean();
  6. final CountDownLatch cdl = new CountDownLatch(1);
  7. Flowable.ambArray(
  8. Flowable.just(1)
  9. .subscribeOn(Schedulers.single())
  10. .observeOn(Schedulers.computation()),
  11. Flowable.never()
  12. )
  13. .subscribe(new Consumer<Object>() {
  14. @Override
  15. public void accept(Object v) throws Exception {
  16. interrupted.set(Thread.currentThread().isInterrupted());
  17. cdl.countDown();
  18. }
  19. });
  20. assertTrue(cdl.await(500, TimeUnit.SECONDS));
  21. assertFalse("Interrupted!", interrupted.get());
  22. }
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noWinnerCompleteDispose() throws Exception {
  4. for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
  5. final AtomicBoolean interrupted = new AtomicBoolean();
  6. final CountDownLatch cdl = new CountDownLatch(1);
  7. Flowable.ambArray(
  8. Flowable.empty()
  9. .subscribeOn(Schedulers.single())
  10. .observeOn(Schedulers.computation()),
  11. Flowable.never()
  12. )
  13. .subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. interrupted.set(Thread.currentThread().isInterrupted());
  17. cdl.countDown();
  18. }
  19. });
  20. assertTrue(cdl.await(500, TimeUnit.SECONDS));
  21. assertFalse("Interrupted!", interrupted.get());
  22. }
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noWinnerErrorDispose() throws Exception {
  4. final TestException ex = new TestException();
  5. for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
  6. final AtomicBoolean interrupted = new AtomicBoolean();
  7. final CountDownLatch cdl = new CountDownLatch(1);
  8. Flowable.ambArray(
  9. Flowable.error(ex)
  10. .subscribeOn(Schedulers.single())
  11. .observeOn(Schedulers.computation()),
  12. Flowable.never()
  13. )
  14. .subscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
  15. @Override
  16. public void accept(Throwable e) throws Exception {
  17. interrupted.set(Thread.currentThread().isInterrupted());
  18. cdl.countDown();
  19. }
  20. });
  21. assertTrue(cdl.await(500, TimeUnit.SECONDS));
  22. assertFalse("Interrupted!", interrupted.get());
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onCompleteRace() {
  3. for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
  4. final PublishProcessor<Integer> pp1 = PublishProcessor.create();
  5. final PublishProcessor<Integer> pp2 = PublishProcessor.create();
  6. @SuppressWarnings("unchecked")
  7. TestSubscriber<Integer> ts = Flowable.ambArray(pp1, pp2).test();
  8. Runnable r1 = new Runnable() {
  9. @Override
  10. public void run() {
  11. pp1.onComplete();
  12. }
  13. };
  14. Runnable r2 = new Runnable() {
  15. @Override
  16. public void run() {
  17. pp2.onComplete();
  18. }
  19. };
  20. TestHelper.race(r1, r2);
  21. ts.assertResult();
  22. }
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. TestSubscriber<Integer> ts = Flowable.ambArray(pp1, pp2).test();

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
  4. final AtomicLong count = new AtomicLong();
  5. Consumer<Subscription> incrementer = new Consumer<Subscription>() {
  6. @Override
  7. public void accept(Subscription s) {
  8. count.incrementAndGet();
  9. }
  10. };
  11. //this aync stream should emit first
  12. Flowable<Integer> f1 = Flowable.just(1).doOnSubscribe(incrementer)
  13. .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  14. //this stream emits second
  15. Flowable<Integer> f2 = Flowable.just(1).doOnSubscribe(incrementer)
  16. .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  17. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  18. Flowable.ambArray(f1, f2).subscribe(ts);
  19. ts.request(1);
  20. ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  21. ts.assertNoErrors();
  22. assertEquals(2, count.get());
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onNextRace() {
  3. for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
  4. final PublishProcessor<Integer> pp1 = PublishProcessor.create();
  5. final PublishProcessor<Integer> pp2 = PublishProcessor.create();
  6. @SuppressWarnings("unchecked")
  7. TestSubscriber<Integer> ts = Flowable.ambArray(pp1, pp2).test();
  8. Runnable r1 = new Runnable() {
  9. @Override
  10. public void run() {
  11. pp1.onNext(1);
  12. }
  13. };
  14. Runnable r2 = new Runnable() {
  15. @Override
  16. public void run() {
  17. pp2.onNext(1);
  18. }
  19. };
  20. TestHelper.race(r1, r2);
  21. ts.assertSubscribed().assertNoErrors()
  22. .assertNotComplete().assertValueCount(1);
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
  4. //this aync stream should emit first
  5. Flowable<Integer> f1 = Flowable.fromArray(1, 2, 3)
  6. .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  7. //this stream emits second
  8. Flowable<Integer> f2 = Flowable.fromArray(4, 5, 6)
  9. .delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  10. TestSubscriber<Integer> ts = new TestSubscriber<Integer>(1L);
  11. Flowable.ambArray(f1, f2).subscribe(ts);
  12. // before first emission request 20 more
  13. // this request should suffice to emit all
  14. ts.request(20);
  15. //ensure stream does not hang
  16. ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  17. ts.assertNoErrors();
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testAmbCancelsOthers() {
  4. PublishProcessor<Integer> source1 = PublishProcessor.create();
  5. PublishProcessor<Integer> source2 = PublishProcessor.create();
  6. PublishProcessor<Integer> source3 = PublishProcessor.create();
  7. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  8. Flowable.ambArray(source1, source2, source3).subscribe(ts);
  9. assertTrue("Source 1 doesn't have subscribers!", source1.hasSubscribers());
  10. assertTrue("Source 2 doesn't have subscribers!", source2.hasSubscribers());
  11. assertTrue("Source 3 doesn't have subscribers!", source3.hasSubscribers());
  12. source1.onNext(1);
  13. assertTrue("Source 1 doesn't have subscribers!", source1.hasSubscribers());
  14. assertFalse("Source 2 still has subscribers!", source2.hasSubscribers());
  15. assertFalse("Source 2 still has subscribers!", source3.hasSubscribers());
  16. }

相关文章

Flowable类方法