io.reactivex.Observable.concatMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(161)

本文整理了Java中io.reactivex.Observable.concatMap()方法的一些代码示例,展示了Observable.concatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMap()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMap

Observable.concatMap介绍

[英]Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items that result from concatenating those resulting ObservableSources.

Scheduler: concatMap does not operate by default on a particular Scheduler.
[中]返回一个新的Observable,该Observable将向源ObservableSource发出的每个项应用一个函数,该函数将返回一个ObservableSource,然后发送连接这些ObservableSource后产生的项。
调度器:默认情况下,concatMap不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns a new Observable that emits items resulting from applying a function that you supply to each item
  3. * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
  4. * that result from concatenating those resulting ObservableSources.
  5. * <p>
  6. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code concatMap} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. *
  12. * @param <R> the type of the inner ObservableSource sources and thus the output type
  13. * @param mapper
  14. * a function that, when applied to an item emitted by the source ObservableSource, returns an
  15. * ObservableSource
  16. * @return an Observable that emits the result of applying the transformation function to each item emitted
  17. * by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
  18. * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
  19. */
  20. @CheckReturnValue
  21. @SchedulerSupport(SchedulerSupport.NONE)
  22. public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
  23. return concatMap(mapper, 2);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void concatMapNull() {
  3. just1.concatMap(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void concatMapReturnsNull() {
  3. just1.concatMap(new Function<Integer, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapJustSource() {
  3. Observable.just(0)
  4. .concatMap(new Function<Object, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Object v) throws Exception {
  7. return Observable.just(1);
  8. }
  9. }, 16)
  10. .test()
  11. .assertResult(1);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.<Integer>just(1).hide()
  4. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.error(new TestException());
  8. }
  9. }));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedPollThrows() {
  3. Observable.just(1)
  4. .map(new Function<Integer, Integer>() {
  5. @Override
  6. public Integer apply(Integer v) throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  11. @Override
  12. public ObservableSource<Integer> apply(Integer v) throws Exception {
  13. return Observable.range(v, 2);
  14. }
  15. })
  16. .test()
  17. .assertFailure(TestException.class);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerError() {
  3. Observable.<Integer>just(1).hide()
  4. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.error(new TestException());
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapperThrows() {
  3. Observable.just(1).hide()
  4. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that concatenate each item emitted by the source ObservableSource with the values in an
  3. * Iterable corresponding to that item that is generated by a selector.
  4. * <p>
  5. * <img width="640" height="275" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapIterable.o.png" alt="">
  6. *
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code concatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. *
  12. * @param <U>
  13. * the type of item emitted by the resulting ObservableSource
  14. * @param mapper
  15. * a function that returns an Iterable sequence of values for when given an item emitted by the
  16. * source ObservableSource
  17. * @param prefetch
  18. * the number of elements to prefetch from the current Observable
  19. * @return an Observable that emits the results of concatenating the items emitted by the source ObservableSource with
  20. * the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
  21. * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
  22. */
  23. @CheckReturnValue
  24. @SchedulerSupport(SchedulerSupport.NONE)
  25. public final <U> Observable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch) {
  26. ObjectHelper.requireNonNull(mapper, "mapper is null");
  27. ObjectHelper.verifyPositive(prefetch, "prefetch");
  28. return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), prefetch);
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. .concatMap(new Function<byte[], Observable<byte[]>>() {
  2. @Override
  3. public Observable<byte[]> apply(byte[] v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

  1. .concatMap(func).subscribe(new DefaultObserver<Integer>() {
  2. @Override
  3. public void onNext(Integer t) {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusionWithConcatMap() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.fromIterable(Arrays.asList(1, 2, 3, 4)).concatMap(
  5. new Function<Integer, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(Integer v) {
  8. return Observable.range(v, 2);
  9. }
  10. }).subscribe(to);
  11. to.assertValues(1, 2, 2, 3, 3, 4, 4, 5);
  12. to.assertNoErrors();
  13. to.assertComplete();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapErrorEmptySource() {
  3. assertSame(Observable.empty(), Observable.<Object>empty()
  4. .concatMap(new Function<Object, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Object v) throws Exception {
  7. return Observable.just(1);
  8. }
  9. }, 16));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. /** Issue #2844: wrong target of request. */
  2. @Test(timeout = 3000)
  3. public void testRepeatRetarget() {
  4. final List<Integer> concatBase = new ArrayList<Integer>();
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. Observable.just(1, 2)
  7. .repeat(5)
  8. .concatMap(new Function<Integer, Observable<Integer>>() {
  9. @Override
  10. public Observable<Integer> apply(Integer x) {
  11. System.out.println("testRepeatRetarget -> " + x);
  12. concatBase.add(x);
  13. return Observable.<Integer>empty()
  14. .delay(200, TimeUnit.MILLISECONDS);
  15. }
  16. })
  17. .subscribe(to);
  18. to.awaitTerminalEvent();
  19. to.assertNoErrors();
  20. to.assertNoValues();
  21. assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPrevious() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. Observable.range(1, 5)
  5. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(Integer v) throws Exception {
  8. return Observable.just(v).doOnDispose(new Action() {
  9. @Override
  10. public void run() throws Exception {
  11. counter.getAndIncrement();
  12. }
  13. });
  14. }
  15. })
  16. .test()
  17. .assertResult(1, 2, 3, 4, 5);
  18. assertEquals(0, counter.get());
  19. }
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatReportsDisposedOnComplete() {
  4. final Disposable[] disposable = { null };
  5. Observable.fromArray(Observable.just(1), Observable.just(2))
  6. .hide()
  7. .concatMap(Functions.<Observable<Integer>>identity())
  8. .subscribe(new Observer<Integer>() {
  9. @Override
  10. public void onSubscribe(Disposable d) {
  11. disposable[0] = d;
  12. }
  13. @Override
  14. public void onNext(Integer t) {
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. }
  19. @Override
  20. public void onComplete() {
  21. }
  22. });
  23. assertTrue(disposable[0].isDisposed());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mainError() {
  3. Observable.<Integer>error(new TestException())
  4. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.range(v, 2);
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test//(timeout = 100000)
  2. public void concatMapRangeAsyncLoopIssue2876() {
  3. final long durationSeconds = 2;
  4. final long startTime = System.currentTimeMillis();
  5. for (int i = 0;; i++) {
  6. //only run this for a max of ten seconds
  7. if (System.currentTimeMillis() - startTime > TimeUnit.SECONDS.toMillis(durationSeconds)) {
  8. return;
  9. }
  10. if (i % 1000 == 0) {
  11. System.out.println("concatMapRangeAsyncLoop > " + i);
  12. }
  13. TestObserver<Integer> to = new TestObserver<Integer>();
  14. Observable.range(0, 1000)
  15. .concatMap(new Function<Integer, Observable<Integer>>() {
  16. @Override
  17. public Observable<Integer> apply(Integer t) {
  18. return Observable.fromIterable(Arrays.asList(t));
  19. }
  20. })
  21. .observeOn(Schedulers.computation()).subscribe(to);
  22. to.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS);
  23. to.assertTerminated();
  24. to.assertNoErrors();
  25. assertEquals(1000, to.valueCount());
  26. assertEquals((Integer)999, to.values().get(999));
  27. }
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void outputFusedOneSignal() {
  3. final BehaviorSubject<Integer> bs = BehaviorSubject.createDefault(1);
  4. bs.observeOn(ImmediateThinScheduler.INSTANCE)
  5. .concatMap(new Function<Integer, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(Integer v)
  8. throws Exception {
  9. return Observable.just(v + 1);
  10. }
  11. })
  12. .subscribeWith(new TestObserver<Integer>() {
  13. @Override
  14. public void onNext(Integer t) {
  15. super.onNext(t);
  16. if (t == 2) {
  17. bs.onNext(2);
  18. }
  19. }
  20. })
  21. .assertValuesOnly(2, 3);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @SuppressWarnings("unchecked")
  3. public void concatReportsDisposedOnError() {
  4. final Disposable[] disposable = { null };
  5. Observable.fromArray(Observable.just(1), Observable.<Integer>error(new TestException()))
  6. .hide()
  7. .concatMap(Functions.<Observable<Integer>>identity())
  8. .subscribe(new Observer<Integer>() {
  9. @Override
  10. public void onSubscribe(Disposable d) {
  11. disposable[0] = d;
  12. }
  13. @Override
  14. public void onNext(Integer t) {
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. }
  19. @Override
  20. public void onComplete() {
  21. }
  22. });
  23. assertTrue(disposable[0].isDisposed());
  24. }

相关文章

Observable类方法