io.reactivex.Flowable.single()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(280)

本文整理了Java中io.reactivex.Flowable.single()方法的一些代码示例,展示了Flowable.single()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.single()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:single

Flowable.single介绍

[英]Returns a Single that emits the single item emitted by the source Publisher, if that Publisher emits only a single item, or a default item if the source Publisher emits no items. If the source Publisher emits more than one item, an IllegalArgumentException is signaled instead.

Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure). Scheduler: single does not operate by default on a particular Scheduler.
[中]如果源发布服务器仅发送单个项目,则返回发送源发布服务器发送的单个项目的单个项目;如果源发布服务器不发送任何项目,则返回默认项目。如果源发布服务器发出多个项目,则会发出IllegalArgumentException信号。
背压:操作员接受来自下游的背压,并以无限制的方式(即不施加背压)消耗源发布服务器。Scheduler:single默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void singleNull() {
  just1.single(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefault() {
  Single<Integer> single = Flowable.just(1).single(2);
  SingleObserver<Integer> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onSuccess(1);
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithEmpty() {
  Single<Integer> single = Flowable.<Integer> empty()
      .single(1);
  SingleObserver<Integer> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onSuccess(1);
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithTooManyElements() {
  Single<Integer> single = Flowable.just(1, 2).single(3);
  SingleObserver<Integer> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onError(
      isA(IllegalArgumentException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithEmptyFlowable() {
  Flowable<Integer> flowable = Flowable.<Integer> empty()
      .single(1).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onNext(1);
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultFlowable() {
  Flowable<Integer> flowable = Flowable.just(1).single(2).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onNext(1);
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithPredicate() {
  Single<Integer> single = Flowable.just(1, 2)
      .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer t1) {
          return t1 % 2 == 0;
        }
      })
      .single(4);
  SingleObserver<Integer> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onSuccess(2);
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithPredicateAndEmpty() {
  Single<Integer> single = Flowable.just(1)
      .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer t1) {
          return t1 % 2 == 0;
        }
      })
      .single(2);
  SingleObserver<Integer> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onSuccess(2);
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithPredicateAndTooManyElements() {
  Single<Integer> single = Flowable.just(1, 2, 3, 4)
      .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer t1) {
          return t1 % 2 == 0;
        }
      })
      .single(6);
  SingleObserver<Integer> observer = TestHelper.mockSingleObserver();
  single.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onError(
      isA(IllegalArgumentException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithTooManyElementsFlowable() {
  Flowable<Integer> flowable = Flowable.just(1, 2).single(3).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onError(
      isA(IllegalArgumentException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithPredicateFlowable() {
  Flowable<Integer> flowable = Flowable.just(1, 2)
      .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer t1) {
          return t1 % 2 == 0;
        }
      })
      .single(4).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onNext(2);
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithPredicateAndEmptyFlowable() {
  Flowable<Integer> flowable = Flowable.just(1)
      .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer t1) {
          return t1 % 2 == 0;
        }
      })
      .single(2).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onNext(2);
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorObservable() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  IllegalArgumentException error = new IllegalArgumentException("Error");
  Single<String> single = Flowable.<String>error(error).single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertError(error);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
    TestSubscriber<String> subscriber = TestSubscriber.create();
    final AtomicBoolean unsubscribed = new AtomicBoolean(false);
    Single<String> single = Flowable.just("Hello World!").doOnCancel(new Action() {

      @Override
      public void run() {
        unsubscribed.set(true);
      }}).single("");
    single.toFlowable().subscribe(subscriber);
    subscriber.assertComplete();
    Assert.assertFalse(unsubscribed.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleOrDefaultWithPredicateAndTooManyElementsFlowable() {
  Flowable<Integer> flowable = Flowable.just(1, 2, 3, 4)
      .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer t1) {
          return t1 % 2 == 0;
        }
      })
      .single(6).toFlowable();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onError(
      isA(IllegalArgumentException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() throws Exception {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  final String[] name = { null };
  final CountDownLatch cdl = new CountDownLatch(1);
  pp.doOnCancel(new Action() {
    @Override
    public void run() throws Exception {
      name[0] = Thread.currentThread().getName();
      cdl.countDown();
    }
  })
  .single(-99)
  .unsubscribeOn(Schedulers.single())
  .test(true)
  ;
  assertTrue(cdl.await(5, TimeUnit.SECONDS));
  int times = 10;
  while (times-- > 0 && pp.hasSubscribers()) {
    Thread.sleep(100);
  }
  assertFalse(pp.hasSubscribers());
  assertNotEquals(Thread.currentThread().getName(), name[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testJustTwoEmissionsObservableThrowsError() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Single<String> single = Flowable.just("First", "Second").single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertError(IllegalArgumentException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testEmptyObservable() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Single<String> single = Flowable.<String>empty().single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertResult("");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testJustSingleItemObservable() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Single<String> single = Flowable.just("Hello World!").single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertResult("Hello World!");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRepeatObservableThrowsError() {
  TestSubscriber<String> subscriber = TestSubscriber.create();
  Single<String> single = Flowable.just("First", "Second").repeat().single("");
  single.toFlowable().subscribe(subscriber);
  subscriber.assertError(IllegalArgumentException.class);
}

相关文章

Flowable类方法