io.reactivex.Flowable.flatMapCompletable()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(182)

本文整理了Java中io.reactivex.Flowable.flatMapCompletable()方法的一些代码示例,展示了Flowable.flatMapCompletable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.flatMapCompletable()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:flatMapCompletable

Flowable.flatMapCompletable介绍

[英]Maps each element of the upstream Flowable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete. Backpressure: The operator consumes the upstream in an unbounded manner. Scheduler: flatMapCompletable does not operate by default on a particular Scheduler.
[中]将上游流的每个元素映射到CompletableSources,订阅它们并等待上游和所有CompletableSources完成。背压:操作员以无限制的方式消耗上游压力。调度程序:默认情况下,flatMapCompletable不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Integer> f) throws Exception {
    return f.flatMapCompletable(new Function<Integer, CompletableSource>() {
      @Override
      public CompletableSource apply(Integer v) throws Exception {
        return Completable.complete();
      }
    });
  }
}, false, 1, null);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Integer> f) throws Exception {
    return f.flatMapCompletable(new Function<Integer, CompletableSource>() {
      @Override
      public CompletableSource apply(Integer v) throws Exception {
        return Completable.complete();
      }
    }).toFlowable();
  }
}, false, 1, null);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposed() {
  TestHelper.checkDisposed(Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerObserver() {
  Flowable.range(1, 3)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return new Completable() {
        @Override
        protected void subscribeActual(CompletableObserver observer) {
          observer.onSubscribe(Disposables.empty());
          assertFalse(((Disposable)observer).isDisposed());
          ((Disposable)observer).dispose();
          assertTrue(((Disposable)observer).isDisposed());
        }
      };
    }
  })
  .test();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedFlowable() {
  TestHelper.checkDisposed(Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }).toFlowable());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  })
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayError() {
  Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, true, Integer.MAX_VALUE)
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayErrorMaxConcurrency() {
  Flowable.range(1, 3)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      if (v == 2) {
        return Completable.error(new TestException());
      }
      return Completable.complete();
    }
  }, true, 1)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalFlowable() {
  Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }).toFlowable()
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayErrorFlowable() {
  Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, true, Integer.MAX_VALUE).toFlowable()
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalAsync() {
  Flowable.range(1, 1000)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalAsyncMaxConcurrency() {
  Flowable.range(1, 1000)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
    }
  }, false, 3)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalAsyncFlowable() {
  Flowable.range(1, 1000)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
    }
  }).toFlowable()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayInnerErrorAllFlowable() {
  TestSubscriber<Integer> ts = Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true, Integer.MAX_VALUE).<Integer>toFlowable()
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  for (int i = 0; i < 10; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalAsyncFlowableMaxConcurrency() {
  Flowable.range(1, 1000)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
    }
  }, false, 3).toFlowable()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalNonDelayErrorOuter() {
  Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, false, Integer.MAX_VALUE)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalNonDelayErrorOuterFlowable() {
  Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, false, Integer.MAX_VALUE).toFlowable()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayErrorAll() {
  TestObserver<Void> to = Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true, Integer.MAX_VALUE)
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  for (int i = 0; i < 11; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayErrorAllFlowable() {
  TestSubscriber<Integer> ts = Flowable.range(1, 10).concatWith(Flowable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true, Integer.MAX_VALUE).<Integer>toFlowable()
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  for (int i = 0; i < 11; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedFlowable() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  Flowable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }).<Integer>toFlowable()
  .subscribe(ts);
  ts
  .assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult();
}

相关文章

Flowable类方法