io.reactivex.Observable.concatArrayEager()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.5k)|赞(0)|评价(0)|浏览(167)

本文整理了Java中io.reactivex.Observable.concatArrayEager()方法的一些代码示例,展示了Observable.concatArrayEager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatArrayEager()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatArrayEager

Observable.concatArrayEager介绍

[英]Concatenates a sequence of ObservableSources eagerly into a single stream of values.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes. Scheduler: This method does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates an array of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
  5. * <p>
  6. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  7. * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
  8. * in order, each one after the previous one completes.
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources an array of ObservableSources that need to be eagerly concatenated
  15. * @return the new ObservableSource instance with the specified concatenation behavior
  16. * @since 2.0
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources) {
  21. return concatArrayEager(bufferSize(), bufferSize(), sources);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness2() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source).subscribe(to);
  12. Assert.assertEquals(2, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness5() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(5, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness7() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(7, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness3() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source).subscribe(to);
  12. Assert.assertEquals(3, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness6() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(6, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness4() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source).subscribe(to);
  12. Assert.assertEquals(4, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness8() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(8, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness9() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(9, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates a sequence of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
  6. * in order, each one after the previous one completes.
  7. * <p>
  8. * <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources a sequence of ObservableSources that need to be eagerly concatenated
  15. * @return the new ObservableSource instance with the specified concatenation behavior
  16. * @since 2.0
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources) {
  21. return concatArrayEager(bufferSize(), bufferSize(), sources);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testInnerError() {
  4. // TODO verify: concatMapEager subscribes first then consumes the sources is okay
  5. PublishSubject<Integer> ps = PublishSubject.create();
  6. Observable.concatArrayEager(Observable.just(1), ps)
  7. .subscribe(to);
  8. ps.onError(new TestException());
  9. to.assertValue(1);
  10. to.assertError(TestException.class);
  11. to.assertNotComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testInnerEmpty() {
  4. Observable.concatArrayEager(Observable.empty(), Observable.empty()).subscribe(to);
  5. to.assertNoValues();
  6. to.assertNoErrors();
  7. to.assertComplete();
  8. }

相关文章

Observable类方法