io.reactivex.Observable.switchMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(249)

本文整理了Java中io.reactivex.Observable.switchMap()方法的一些代码示例,展示了Observable.switchMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.switchMap()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:switchMap

Observable.switchMap介绍

[英]Returns a new ObservableSource by applying a function that you supply to each item emitted by the source ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted of these ObservableSources.

The resulting ObservableSource completes if both the upstream ObservableSource and the last inner ObservableSource, if any, complete. If the upstream ObservableSource signals an onError, the inner ObservableSource is disposed and the error delivered in-sequence.

Scheduler: switchMap does not operate by default on a particular Scheduler.
[中]通过向源ObservableSource(返回ObservableSource的源ObservableSource)发出的每个项应用一个函数,然后发出这些ObservableSource中最近发出的项,返回一个新的ObservableSource。
如果上游可观测资源和最后一个内部可观测资源(如果有)都已完成,则生成的可观测资源将完成。如果上游可观测资源发出onError信号,内部可观测资源将被处理,错误将按顺序传递。
调度器:switchMap默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Throwable> errors) throws Exception {
  3. return errors.switchMap(new Function<Throwable, ObservableSource<Object>>() {
  4. @Override
  5. public ObservableSource<Object> apply(Throwable ignore) throws Exception {
  6. return subject;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Object> completions) throws Exception {
  3. return completions.switchMap(new Function<Object, ObservableSource<Object>>() {
  4. @Override
  5. public ObservableSource<Object> apply(Object ignore) throws Exception {
  6. return subject;
  7. }
  8. });
  9. }
  10. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void switchMapNull() {
  3. just1.switchMap(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
  3. * ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted
  4. * of these ObservableSources.
  5. * <p>
  6. * The resulting ObservableSource completes if both the upstream ObservableSource and the last inner ObservableSource, if any, complete.
  7. * If the upstream ObservableSource signals an onError, the inner ObservableSource is disposed and the error delivered in-sequence.
  8. * <p>
  9. * <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
  10. * <dl>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>{@code switchMap} does not operate by default on a particular {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param <R> the element type of the inner ObservableSources and the output
  16. * @param mapper
  17. * a function that, when applied to an item emitted by the source ObservableSource, returns an
  18. * ObservableSource
  19. * @return an Observable that emits the items emitted by the ObservableSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
  20. * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
  21. */
  22. @CheckReturnValue
  23. @SchedulerSupport(SchedulerSupport.NONE)
  24. public final <R> Observable<R> switchMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
  25. return switchMap(mapper, bufferSize());
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void switchMapFunctionReturnsNull() {
  3. just1.switchMap(new Function<Integer, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void switchMapJustSource() {
  3. Observable.just(0)
  4. .switchMap(new Function<Object, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Object v) throws Exception {
  7. return Observable.just(1);
  8. }
  9. }, 16)
  10. .test()
  11. .assertResult(1);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapperThrows() {
  3. Observable.just(1).hide()
  4. .switchMap(new Function<Integer, ObservableSource<Object>>() {
  5. @Override
  6. public ObservableSource<Object> apply(Integer v) throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void asyncFused() {
  3. Observable.just(1).hide()
  4. .switchMap(Functions.justFunction(
  5. Observable.range(1, 5)
  6. .observeOn(ImmediateThinScheduler.INSTANCE)
  7. ))
  8. .test()
  9. .assertResult(1, 2, 3, 4, 5);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void justInner() {
  3. Observable.range(1, 5)
  4. .switchMap(Functions.justFunction(Observable.just(1)))
  5. .test()
  6. .assertResult(1, 1, 1, 1, 1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void switchMapErrorEmptySource() {
  3. assertSame(Observable.empty(), Observable.<Object>empty()
  4. .switchMap(new Function<Object, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Object v) throws Exception {
  7. return Observable.just(1);
  8. }
  9. }, 16));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerCompletesReentrant() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>() {
  5. @Override
  6. public void onNext(Integer t) {
  7. super.onNext(t);
  8. ps.onComplete();
  9. }
  10. };
  11. Observable.just(1).hide()
  12. .switchMap(Functions.justFunction(ps))
  13. .subscribe(to);
  14. ps.onNext(1);
  15. to.assertResult(1);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorsReentrant() {
  3. final PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>() {
  5. @Override
  6. public void onNext(Integer t) {
  7. super.onNext(t);
  8. ps.onError(new TestException());
  9. }
  10. };
  11. Observable.just(1).hide()
  12. .switchMap(Functions.justFunction(ps))
  13. .subscribe(to);
  14. ps.onNext(1);
  15. to.assertFailure(TestException.class, 1);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void asyncFusedRejecting() {
  3. Observable.just(1).hide()
  4. .switchMap(Functions.justFunction(
  5. TestHelper.rejectObservableFusion()
  6. ))
  7. .test()
  8. .assertEmpty();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedBoundary() {
  3. String thread = Thread.currentThread().getName();
  4. Observable.range(1, 10000)
  5. .switchMap(new Function<Integer, ObservableSource<? extends Object>>() {
  6. @Override
  7. public ObservableSource<? extends Object> apply(Integer v)
  8. throws Exception {
  9. return Observable.just(2).hide()
  10. .observeOn(Schedulers.single())
  11. .map(new Function<Integer, Object>() {
  12. @Override
  13. public Object apply(Integer w) throws Exception {
  14. return Thread.currentThread().getName();
  15. }
  16. });
  17. }
  18. })
  19. .test()
  20. .awaitDone(5, TimeUnit.SECONDS)
  21. .assertNever(thread)
  22. .assertNoErrors()
  23. .assertComplete();
  24. }
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emptyInner() {
  3. Observable.range(1, 5)
  4. .switchMap(Functions.justFunction(Observable.empty()))
  5. .test()
  6. .assertResult();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void switchMapInnerCancelled() {
  3. PublishSubject<Integer> ps = PublishSubject.create();
  4. TestObserver<Integer> to = Observable.just(1)
  5. .switchMap(Functions.justFunction(ps))
  6. .test();
  7. assertTrue(ps.hasObservers());
  8. to.cancel();
  9. assertFalse(ps.hasObservers());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void badInnerSource() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.just(1).hide()
  6. .switchMap(Functions.justFunction(new Observable<Integer>() {
  7. @Override
  8. protected void subscribeActual(Observer<? super Integer> observer) {
  9. observer.onSubscribe(Disposables.empty());
  10. observer.onError(new TestException());
  11. observer.onComplete();
  12. observer.onError(new TestException());
  13. observer.onComplete();
  14. }
  15. }))
  16. .test()
  17. .assertFailure(TestException.class);
  18. TestHelper.assertUndeliverable(errors, 0, TestException.class);
  19. } finally {
  20. RxJavaPlugins.reset();
  21. }
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void syncFusedSingle() {
  3. Observable.range(1, 5).hide()
  4. .switchMap(Functions.justFunction(
  5. Single.just(1).toObservable()
  6. ))
  7. .test()
  8. .assertResult(1, 1, 1, 1, 1);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void syncFusedCompletable() {
  3. Observable.range(1, 5).hide()
  4. .switchMap(Functions.justFunction(
  5. Completable.complete().toObservable()
  6. ))
  7. .test()
  8. .assertResult();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void syncFusedMaybe() {
  3. Observable.range(1, 5).hide()
  4. .switchMap(Functions.justFunction(
  5. Maybe.just(1).toObservable()
  6. ))
  7. .test()
  8. .assertResult(1, 1, 1, 1, 1);
  9. }

相关文章

Observable类方法