io.reactivex.Observable.flatMapMaybe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中io.reactivex.Observable.flatMapMaybe()方法的一些代码示例,展示了Observable.flatMapMaybe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapMaybe()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMapMaybe

Observable.flatMapMaybe介绍

[英]Maps each element of the upstream Observable into MaybeSources, subscribes to all of them and merges their onSuccess values, in no particular order, into a single Observable sequence.

Scheduler: flatMapMaybe does not operate by default on a particular Scheduler.
[中]将上游可观测的每个元素映射到可能资源中,订阅所有这些元素,并将它们的onSuccess值(无特定顺序)合并到单个可观测序列中。
调度器:FlatMap默认情况下可能不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Maps each element of the upstream Observable into MaybeSources, subscribes to all of them
  3. * and merges their onSuccess values, in no particular order, into a single Observable sequence.
  4. * <p>
  5. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMapMaybe.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. * @param <R> the result value type
  11. * @param mapper the function that received each source value and transforms them into MaybeSources.
  12. * @return the new Observable instance
  13. */
  14. @CheckReturnValue
  15. @SchedulerSupport(SchedulerSupport.NONE)
  16. public final <R> Observable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
  17. return flatMapMaybe(mapper, false);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Integer> apply(Observable<Object> f) throws Exception {
  3. return f.flatMapMaybe(Functions.justFunction(Maybe.just(2)));
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposeInner() {
  3. final TestObserver<Object> to = new TestObserver<Object>();
  4. Observable.just(1).flatMapMaybe(new Function<Integer, MaybeSource<Object>>() {
  5. @Override
  6. public MaybeSource<Object> apply(Integer v) throws Exception {
  7. return new Maybe<Object>() {
  8. @Override
  9. protected void subscribeActual(MaybeObserver<? super Object> observer) {
  10. observer.onSubscribe(Disposables.empty());
  11. assertFalse(((Disposable)observer).isDisposed());
  12. to.dispose();
  13. assertTrue(((Disposable)observer).isDisposed());
  14. }
  15. };
  16. }
  17. })
  18. .subscribe(to);
  19. to
  20. .assertEmpty();
  21. }
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void observerMaybe() throws Exception {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. TestObserver<Integer> to = Observable.just(1)
  6. .subscribeOn(Schedulers.io())
  7. .flatMapMaybe(new Function<Integer, Maybe<Integer>>() {
  8. @Override
  9. public Maybe<Integer> apply(Integer v) throws Exception {
  10. sleep();
  11. return Maybe.<Integer>error(new TestException());
  12. }
  13. })
  14. .test();
  15. cb.await();
  16. beforeCancelSleep(to);
  17. to.cancel();
  18. Thread.sleep(SLEEP_AFTER_CANCEL);
  19. to.assertEmpty();
  20. assertTrue(errors.toString(), errors.isEmpty());
  21. } finally {
  22. RxJavaPlugins.reset();
  23. }
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normal() {
  3. Observable.range(1, 10)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.just(v);
  8. }
  9. })
  10. .test()
  11. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalEmpty() {
  3. Observable.range(1, 10)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.empty();
  8. }
  9. })
  10. .test()
  11. .assertResult();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayError() {
  3. Observable.range(1, 10)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.just(v);
  8. }
  9. }, true)
  10. .test()
  11. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void middleError() {
  3. Observable.fromArray(new String[]{"1", "a", "2"}).flatMapMaybe(new Function<String, MaybeSource<Integer>>() {
  4. @Override
  5. public MaybeSource<Integer> apply(final String s) throws NumberFormatException {
  6. //return Single.just(Integer.valueOf(s)); //This works
  7. return Maybe.fromCallable(new Callable<Integer>() {
  8. @Override
  9. public Integer call() throws NumberFormatException {
  10. return Integer.valueOf(s);
  11. }
  12. });
  13. }
  14. })
  15. .test()
  16. .assertFailure(NumberFormatException.class, 1);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void take() {
  3. Observable.range(1, 10)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.just(v);
  8. }
  9. })
  10. .take(2)
  11. .test()
  12. .assertResult(1, 2);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerSuccessCompletesAfterMain() {
  3. PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = Observable.just(1).flatMapMaybe(Functions.justFunction(ps.singleElement()))
  5. .test();
  6. ps.onNext(2);
  7. ps.onComplete();
  8. to
  9. .assertResult(2);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emissionQueueTrigger() {
  3. final PublishSubject<Integer> ps1 = PublishSubject.create();
  4. final PublishSubject<Integer> ps2 = PublishSubject.create();
  5. TestObserver<Integer> to = new TestObserver<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. ps2.onNext(2);
  11. ps2.onComplete();
  12. }
  13. }
  14. };
  15. Observable.just(ps1, ps2)
  16. .flatMapMaybe(new Function<PublishSubject<Integer>, MaybeSource<Integer>>() {
  17. @Override
  18. public MaybeSource<Integer> apply(PublishSubject<Integer> v) throws Exception {
  19. return v.singleElement();
  20. }
  21. })
  22. .subscribe(to);
  23. ps1.onNext(1);
  24. ps1.onComplete();
  25. to.assertResult(1, 2);
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void asyncFlattenNone() {
  3. Observable.range(1, 1000)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.<Integer>empty().subscribeOn(Schedulers.computation());
  8. }
  9. })
  10. .take(500)
  11. .test()
  12. .awaitDone(5, TimeUnit.SECONDS)
  13. .assertResult();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalAsync() {
  3. Observable.range(1, 10)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.just(v).subscribeOn(Schedulers.computation());
  8. }
  9. })
  10. .test()
  11. .awaitDone(5, TimeUnit.SECONDS)
  12. .assertSubscribed()
  13. .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  14. .assertNoErrors()
  15. .assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emissionQueueTrigger2() {
  3. final PublishSubject<Integer> ps1 = PublishSubject.create();
  4. final PublishSubject<Integer> ps2 = PublishSubject.create();
  5. final PublishSubject<Integer> ps3 = PublishSubject.create();
  6. TestObserver<Integer> to = new TestObserver<Integer>() {
  7. @Override
  8. public void onNext(Integer t) {
  9. super.onNext(t);
  10. if (t == 1) {
  11. ps2.onNext(2);
  12. ps2.onComplete();
  13. }
  14. }
  15. };
  16. Observable.just(ps1, ps2, ps3)
  17. .flatMapMaybe(new Function<PublishSubject<Integer>, MaybeSource<Integer>>() {
  18. @Override
  19. public MaybeSource<Integer> apply(PublishSubject<Integer> v) throws Exception {
  20. return v.singleElement();
  21. }
  22. })
  23. .subscribe(to);
  24. ps1.onNext(1);
  25. ps1.onComplete();
  26. ps3.onComplete();
  27. to.assertResult(1, 2);
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void badInnerSource() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.just(1)
  6. .flatMapMaybe(Functions.justFunction(new Maybe<Integer>() {
  7. @Override
  8. protected void subscribeActual(MaybeObserver<? super Integer> observer) {
  9. observer.onSubscribe(Disposables.empty());
  10. observer.onError(new TestException("First"));
  11. observer.onError(new TestException("Second"));
  12. }
  13. }))
  14. .test()
  15. .assertFailureAndMessage(TestException.class, "First");
  16. TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
  17. } finally {
  18. RxJavaPlugins.reset();
  19. }
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void completeError() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = Observable.range(1, 2)
  5. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  6. @Override
  7. public MaybeSource<Integer> apply(Integer v) throws Exception {
  8. if (v == 2) {
  9. return ps.singleElement();
  10. }
  11. return Maybe.error(new TestException());
  12. }
  13. }, true)
  14. .test();
  15. ps.onComplete();
  16. to
  17. .assertFailure(TestException.class);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeAsync() {
  3. Observable.range(1, 10)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.just(v).subscribeOn(Schedulers.computation());
  8. }
  9. })
  10. .take(2)
  11. .test()
  12. .awaitDone(5, TimeUnit.SECONDS)
  13. .assertSubscribed()
  14. .assertValueCount(2)
  15. .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  16. .assertNoErrors()
  17. .assertComplete();
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayErrorAll() {
  3. TestObserver<Integer> to = Observable.range(1, 10).concatWith(Observable.<Integer>error(new TestException()))
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.error(new TestException());
  8. }
  9. }, true)
  10. .test()
  11. .assertFailure(CompositeException.class);
  12. List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  13. for (int i = 0; i < 11; i++) {
  14. TestHelper.assertError(errors, i, TestException.class);
  15. }
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void successError() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = Observable.range(1, 2)
  5. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  6. @Override
  7. public MaybeSource<Integer> apply(Integer v) throws Exception {
  8. if (v == 2) {
  9. return ps.singleElement();
  10. }
  11. return Maybe.error(new TestException());
  12. }
  13. }, true)
  14. .test();
  15. ps.onNext(1);
  16. ps.onComplete();
  17. to
  18. .assertFailure(TestException.class, 1);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void asyncFlatten() {
  3. Observable.range(1, 1000)
  4. .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
  5. @Override
  6. public MaybeSource<Integer> apply(Integer v) throws Exception {
  7. return Maybe.just(1).subscribeOn(Schedulers.computation());
  8. }
  9. })
  10. .take(500)
  11. .test()
  12. .awaitDone(5, TimeUnit.SECONDS)
  13. .assertSubscribed()
  14. .assertValueCount(500)
  15. .assertNoErrors()
  16. .assertComplete();
  17. }

相关文章

Observable类方法