io.reactivex.Observable.concatMapEager()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(191)

本文整理了Java中io.reactivex.Observable.concatMapEager()方法的一些代码示例,展示了Observable.concatMapEager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMapEager()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMapEager

Observable.concatMapEager介绍

[英]Maps a sequence of values into ObservableSources and concatenates these ObservableSources eagerly into a single ObservableSource.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes.

Scheduler: This method does not operate by default on a particular Scheduler.
[中]将一系列值映射到可观测资源中,并将这些可观测资源急切地连接到单个可观测资源中。
即时连接意味着一旦订户订阅,该操作符订阅所有源可观测资源。运算符缓冲这些可观察资源发出的值,然后依次将其耗尽,每一个都在前一个完成之后。
调度器:默认情况下,该方法不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
  3. return o.concatMapEager(new Function<Object, ObservableSource<Object>>() {
  4. @Override
  5. public ObservableSource<Object> apply(Object v) throws Exception {
  6. return Observable.just(v);
  7. }
  8. });
  9. }
  10. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = IllegalArgumentException.class)
  2. public void testInvalidMaxConcurrent() {
  3. Observable.just(1).concatMapEager(toJust, 0, Observable.bufferSize());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void mappingBadCapacityHint() throws Exception {
  4. Observable<Integer> source = Observable.just(1);
  5. try {
  6. Observable.just(source, source, source).concatMapEager((Function)Functions.identity(), 10, -99);
  7. } catch (IllegalArgumentException ex) {
  8. assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
  9. }
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = IllegalArgumentException.class)
  2. public void testInvalidCapacityHint() {
  3. Observable.just(1).concatMapEager(toJust, Observable.bufferSize(), 0);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.just(1).hide().concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return Observable.range(1, 2);
  7. }
  8. }));
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMapperThrows() {
  3. Observable.just(1).concatMapEager(new Function<Integer, Observable<Integer>>() {
  4. @Override
  5. public Observable<Integer> apply(Integer t) {
  6. throw new TestException();
  7. }
  8. }).subscribe(to);
  9. to.assertNoValues();
  10. to.assertNotComplete();
  11. to.assertError(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void oneDelayed() {
  3. Observable.just(1, 2, 3, 4, 5)
  4. .concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer i) throws Exception {
  7. return i == 3 ? Observable.just(i) : Observable
  8. .just(i)
  9. .delay(1, TimeUnit.MILLISECONDS, Schedulers.io());
  10. }
  11. })
  12. .observeOn(Schedulers.io())
  13. .test()
  14. .awaitDone(5, TimeUnit.SECONDS)
  15. .assertResult(1, 2, 3, 4, 5)
  16. ;
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapperCancels() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.just(1).hide()
  5. .concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(Integer v) throws Exception {
  8. to.cancel();
  9. return Observable.never();
  10. }
  11. }, 1, 128)
  12. .subscribe(to);
  13. to.assertEmpty();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorMaxConcurrency() {
  3. Observable.<Integer>just(1).hide().concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return Observable.error(new TestException());
  7. }
  8. }, 1, 128)
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerError() {
  3. Observable.<Integer>just(1).hide().concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return Observable.error(new TestException());
  7. }
  8. })
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Null values are not allowed in RS")
  3. public void testInnerNull() {
  4. Observable.just(1).concatMapEager(new Function<Integer, Observable<Integer>>() {
  5. @Override
  6. public Observable<Integer> apply(Integer t) {
  7. return Observable.just(null);
  8. }
  9. }).subscribe(to);
  10. to.assertNoErrors();
  11. to.assertComplete();
  12. to.assertValue(null);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorFused() {
  3. Observable.<Integer>just(1).hide().concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return Observable.range(1, 2).map(new Function<Integer, Integer>() {
  7. @Override
  8. public Integer apply(Integer v) throws Exception {
  9. throw new TestException();
  10. }
  11. });
  12. }
  13. })
  14. .test()
  15. .assertFailure(TestException.class);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testAsynchronousRun() {
  3. Observable.range(1, 2).concatMapEager(new Function<Integer, Observable<Integer>>() {
  4. @Override
  5. public Observable<Integer> apply(Integer t) {
  6. return Observable.range(1, 1000).subscribeOn(Schedulers.computation());
  7. }
  8. }).observeOn(Schedulers.newThread()).subscribe(to);
  9. to.awaitTerminalEvent(5, TimeUnit.SECONDS);
  10. to.assertNoErrors();
  11. to.assertValueCount(2000);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSimple() {
  3. Observable.range(1, 100).concatMapEager(toJust).subscribe(to);
  4. to.assertNoErrors();
  5. to.assertValueCount(100);
  6. to.assertComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSimple2() {
  3. Observable.range(1, 100).concatMapEager(toRange).subscribe(to);
  4. to.assertNoErrors();
  5. to.assertValueCount(200);
  6. to.assertComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerCallableThrows() {
  3. Observable.<Integer>just(1).hide().concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return Observable.fromCallable(new Callable<Integer>() {
  7. @Override
  8. public Integer call() throws Exception {
  9. throw new TestException();
  10. }
  11. });
  12. }
  13. })
  14. .test()
  15. .assertFailure(TestException.class);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normal() {
  3. Observable.range(1, 5)
  4. .concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer t) {
  7. return Observable.range(t, 2);
  8. }
  9. })
  10. .test()
  11. .assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMainError() {
  3. Observable.<Integer>error(new TestException()).concatMapEager(toJust).subscribe(to);
  4. to.assertNoValues();
  5. to.assertError(TestException.class);
  6. to.assertNotComplete();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void empty() {
  3. Observable.<Integer>empty().hide().concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return Observable.range(1, 2);
  7. }
  8. })
  9. .test()
  10. .assertResult();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void longEager() {
  3. Observable.range(1, 2 * Observable.bufferSize())
  4. .concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) {
  7. return Observable.just(1);
  8. }
  9. })
  10. .test()
  11. .assertValueCount(2 * Observable.bufferSize())
  12. .assertNoErrors()
  13. .assertComplete();
  14. }

相关文章

Observable类方法