io.reactivex.Observable.concatMapMaybe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中io.reactivex.Observable.concatMapMaybe()方法的一些代码示例,展示了Observable.concatMapMaybe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMapMaybe()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMapMaybe

Observable.concatMapMaybe介绍

[英]Maps the upstream items into MaybeSources and subscribes to them one after the other succeeds or completes, emits their success value if available or terminates immediately if either this Observable or the current inner MaybeSource fail.

Scheduler: concatMapMaybe does not operate by default on a particular Scheduler.
[中]将上游项目映射到MaybeSource中,并一个接一个地订阅它们,如果可用,则发送它们的成功值,或者如果此可观察或当前内部MaybeSource失败,则立即终止。
调度器:默认情况下,ConcatMap可能不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the
  3. * other succeeds or completes, emits their success value if available or terminates immediately if
  4. * either this {@code Observable} or the current inner {@code MaybeSource} fail.
  5. * <p>
  6. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. * <p>History: 2.1.11 - experimental
  12. * @param <R> the result type of the inner {@code MaybeSource}s
  13. * @param mapper the function called with the upstream item and should return
  14. * a {@code MaybeSource} to become the next source to
  15. * be subscribed to
  16. * @return a new Observable instance
  17. * @see #concatMapMaybeDelayError(Function)
  18. * @see #concatMapMaybe(Function, int)
  19. * @since 2.2
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final <R> Observable<R> concatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
  24. return concatMapMaybe(mapper, 2);
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void scalarMapperCrash() {
  3. TestObserver<Object> to = Observable.just(1)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<? extends Object>>() {
  5. @Override
  6. public MaybeSource<? extends Object> apply(Integer v)
  7. throws Exception {
  8. throw new TestException();
  9. }
  10. })
  11. .test();
  12. to.assertFailure(TestException.class);
  13. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the
  3. * other succeeds or completes, emits their success value if available or terminates immediately if
  4. * either this {@code Observable} or the current inner {@code MaybeSource} fail.
  5. * <p>
  6. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. * @param <R> the result type of the inner {@code MaybeSource}s
  12. * @param mapper the function called with the upstream item and should return
  13. * a {@code MaybeSource} to become the next source to
  14. * be subscribed to
  15. * @return a new Observable instance
  16. * @since 2.1.11 - experimental
  17. * @see #concatMapMaybeDelayError(Function)
  18. * @see #concatMapMaybe(Function, int)
  19. */
  20. @CheckReturnValue
  21. @SchedulerSupport(SchedulerSupport.NONE)
  22. @Experimental
  23. public final <R> Observable<R> concatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
  24. return concatMapMaybe(mapper, 2);
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposed() {
  3. TestHelper.checkDisposed(Observable.just(1).hide()
  4. .concatMapMaybe(Functions.justFunction(Maybe.never()))
  5. );
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simple() {
  3. Observable.range(1, 5)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Maybe.just(v);
  9. }
  10. })
  11. .test()
  12. .assertResult(1, 2, 3, 4, 5);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void empty() {
  3. Observable.range(1, 10)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Maybe.empty();
  9. }
  10. })
  11. .test()
  12. .assertResult();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerError() {
  3. Observable.just(1)
  4. .concatMapMaybe(Functions.justFunction(Maybe.error(new TestException())))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mixed() {
  3. Observable.range(1, 10)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. if (v % 2 == 0) {
  9. return Maybe.just(v);
  10. }
  11. return Maybe.empty();
  12. }
  13. })
  14. .test()
  15. .assertResult(2, 4, 6, 8, 10);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void take() {
  3. Observable.range(1, 5)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Maybe.just(v);
  9. }
  10. })
  11. .take(3)
  12. .test()
  13. .assertResult(1, 2, 3);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simpleLong() {
  3. Observable.range(1, 1024)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Maybe.just(v);
  9. }
  10. }, 32)
  11. .test()
  12. .assertValueCount(1024)
  13. .assertNoErrors()
  14. .assertComplete();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mainError() {
  3. Observable.error(new TestException())
  4. .concatMapMaybe(Functions.justFunction(Maybe.just(1)))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerSuccessDisposeRace() {
  3. for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
  4. final MaybeSubject<Integer> ms = MaybeSubject.create();
  5. final TestObserver<Integer> to = Observable.just(1)
  6. .hide()
  7. .concatMapMaybe(Functions.justFunction(ms))
  8. .test();
  9. Runnable r1 = new Runnable() {
  10. @Override
  11. public void run() {
  12. ms.onSuccess(1);
  13. }
  14. };
  15. Runnable r2 = new Runnable() {
  16. @Override
  17. public void run() {
  18. to.dispose();
  19. }
  20. };
  21. TestHelper.race(r1, r2);
  22. to.assertNoErrors();
  23. }
  24. }
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void scalarEmptySource() {
  3. MaybeSubject<Integer> ms = MaybeSubject.create();
  4. Observable.empty()
  5. .concatMapMaybe(Functions.justFunction(ms))
  6. .test()
  7. .assertResult();
  8. assertFalse(ms.hasObservers());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void checkUnboundedInnerQueue() {
  3. MaybeSubject<Integer> ms = MaybeSubject.create();
  4. @SuppressWarnings("unchecked")
  5. TestObserver<Integer> to = Observable
  6. .fromArray(ms, Maybe.just(2), Maybe.just(3), Maybe.just(4))
  7. .concatMapMaybe(Functions.<Maybe<Integer>>identity(), 2)
  8. .test();
  9. to.assertEmpty();
  10. ms.onSuccess(1);
  11. to.assertResult(1, 2, 3, 4);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancel() {
  3. Observable.range(1, 5).concatWith(Observable.<Integer>never())
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Maybe.just(v);
  9. }
  10. })
  11. .test()
  12. .assertValues(1, 2, 3, 4, 5)
  13. .assertNoErrors()
  14. .assertNotComplete()
  15. .cancel();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mixedLong() {
  3. Observable.range(1, 1024)
  4. .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v)
  7. throws Exception {
  8. if (v % 2 == 0) {
  9. return Maybe.just(v).subscribeOn(Schedulers.computation());
  10. }
  11. return Maybe.<Integer>empty().subscribeOn(Schedulers.computation());
  12. }
  13. })
  14. .test()
  15. .awaitDone(5, TimeUnit.SECONDS)
  16. .assertValueCount(512)
  17. .assertNoErrors()
  18. .assertComplete()
  19. .assertOf(new Consumer<TestObserver<Integer>>() {
  20. @Override
  21. public void accept(TestObserver<Integer> to) throws Exception {
  22. for (int i = 0; i < 512; i ++) {
  23. to.assertValueAt(i, (i + 1) * 2);
  24. }
  25. }
  26. });
  27. }

相关文章

Observable类方法