io.reactivex.Flowable.flatMapCompletable()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(205)

本文整理了Java中io.reactivex.Flowable.flatMapCompletable()方法的一些代码示例,展示了Flowable.flatMapCompletable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.flatMapCompletable()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:flatMapCompletable

Flowable.flatMapCompletable介绍

[英]Maps each element of the upstream Flowable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete. Backpressure: The operator consumes the upstream in an unbounded manner. Scheduler: flatMapCompletable does not operate by default on a particular Scheduler.
[中]将上游流的每个元素映射到CompletableSources,订阅它们并等待上游和所有CompletableSources完成。背压:操作员以无限制的方式消耗上游压力。调度程序:默认情况下,flatMapCompletable不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Integer> f) throws Exception {
  3. return f.flatMapCompletable(new Function<Integer, CompletableSource>() {
  4. @Override
  5. public CompletableSource apply(Integer v) throws Exception {
  6. return Completable.complete();
  7. }
  8. });
  9. }
  10. }, false, 1, null);

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Integer> f) throws Exception {
  3. return f.flatMapCompletable(new Function<Integer, CompletableSource>() {
  4. @Override
  5. public CompletableSource apply(Integer v) throws Exception {
  6. return Completable.complete();
  7. }
  8. }).toFlowable();
  9. }
  10. }, false, 1, null);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposed() {
  3. TestHelper.checkDisposed(Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerObserver() {
  3. Flowable.range(1, 3)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return new Completable() {
  8. @Override
  9. protected void subscribeActual(CompletableObserver observer) {
  10. observer.onSubscribe(Disposables.empty());
  11. assertFalse(((Disposable)observer).isDisposed());
  12. ((Disposable)observer).dispose();
  13. assertTrue(((Disposable)observer).isDisposed());
  14. }
  15. };
  16. }
  17. })
  18. .test();
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposedFlowable() {
  3. TestHelper.checkDisposed(Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }).toFlowable());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normal() {
  3. Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. })
  10. .test()
  11. .assertResult();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayError() {
  3. Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }, true, Integer.MAX_VALUE)
  10. .test()
  11. .assertResult();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayErrorMaxConcurrency() {
  3. Flowable.range(1, 3)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. if (v == 2) {
  8. return Completable.error(new TestException());
  9. }
  10. return Completable.complete();
  11. }
  12. }, true, 1)
  13. .test()
  14. .assertFailure(TestException.class);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalFlowable() {
  3. Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }).toFlowable()
  10. .test()
  11. .assertResult();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayErrorFlowable() {
  3. Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }, true, Integer.MAX_VALUE).toFlowable()
  10. .test()
  11. .assertResult();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalAsync() {
  3. Flowable.range(1, 1000)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  8. }
  9. })
  10. .test()
  11. .awaitDone(5, TimeUnit.SECONDS)
  12. .assertResult();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalAsyncMaxConcurrency() {
  3. Flowable.range(1, 1000)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  8. }
  9. }, false, 3)
  10. .test()
  11. .awaitDone(5, TimeUnit.SECONDS)
  12. .assertResult();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalAsyncFlowable() {
  3. Flowable.range(1, 1000)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  8. }
  9. }).toFlowable()
  10. .test()
  11. .awaitDone(5, TimeUnit.SECONDS)
  12. .assertResult();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayInnerErrorAllFlowable() {
  3. TestSubscriber<Integer> ts = Flowable.range(1, 10)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.error(new TestException());
  8. }
  9. }, true, Integer.MAX_VALUE).<Integer>toFlowable()
  10. .test()
  11. .assertFailure(CompositeException.class);
  12. List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  13. for (int i = 0; i < 10; i++) {
  14. TestHelper.assertError(errors, i, TestException.class);
  15. }
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalAsyncFlowableMaxConcurrency() {
  3. Flowable.range(1, 1000)
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  8. }
  9. }, false, 3).toFlowable()
  10. .test()
  11. .awaitDone(5, TimeUnit.SECONDS)
  12. .assertResult();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalNonDelayErrorOuter() {
  3. Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }, false, Integer.MAX_VALUE)
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalNonDelayErrorOuterFlowable() {
  3. Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.complete();
  8. }
  9. }, false, Integer.MAX_VALUE).toFlowable()
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayErrorAll() {
  3. TestObserver<Void> to = Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.error(new TestException());
  8. }
  9. }, true, Integer.MAX_VALUE)
  10. .test()
  11. .assertFailure(CompositeException.class);
  12. List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  13. for (int i = 0; i < 11; i++) {
  14. TestHelper.assertError(errors, i, TestException.class);
  15. }
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayErrorAllFlowable() {
  3. TestSubscriber<Integer> ts = Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  4. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  5. @Override
  6. public CompletableSource apply(Integer v) throws Exception {
  7. return Completable.error(new TestException());
  8. }
  9. }, true, Integer.MAX_VALUE).<Integer>toFlowable()
  10. .test()
  11. .assertFailure(CompositeException.class);
  12. List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  13. for (int i = 0; i < 11; i++) {
  14. TestHelper.assertError(errors, i, TestException.class);
  15. }
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedFlowable() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. Flowable.range(1, 10)
  5. .flatMapCompletable(new Function<Integer, CompletableSource>() {
  6. @Override
  7. public CompletableSource apply(Integer v) throws Exception {
  8. return Completable.complete();
  9. }
  10. }).<Integer>toFlowable()
  11. .subscribe(ts);
  12. ts
  13. .assertOf(SubscriberFusion.<Integer>assertFuseable())
  14. .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  15. .assertResult();
  16. }

相关文章

Flowable类方法