io.reactivex.Flowable.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(259)

本文整理了Java中io.reactivex.Flowable.flatMap()方法的一些代码示例,展示了Flowable.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.flatMap()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:flatMap

Flowable.flatMap介绍

[英]Returns a Flowable that emits items based on applying a function that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then merging those resulting Publishers and emitting the results of this merger.

Backpressure: The operator honors backpressure from downstream. The upstream Flowable is consumed in a bounded manner (up to #bufferSize() outstanding request amount for items). The inner Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: flatMap does not operate by default on a particular Scheduler.
[中]返回一个可流动的项目,该项目基于将您提供的函数应用于源发布服务器发出的每个项目,其中该函数返回发布服务器,然后合并这些结果发布服务器并发出此合并的结果。
背压:操作员接受来自下游的背压。上游可流动项以有限制的方式使用(最多为#bufferSize()项的未完成请求量)。内部出版商应尊重背压;如果违反,操作员可能发出信号缺失背压异常。调度程序:默认情况下,flatMap不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Object> f) throws Exception {
  3. return f.window(Flowable.never()).flatMap(new Function<Flowable<Object>, Flowable<Object>>() {
  4. @Override
  5. public Flowable<Object> apply(Flowable<Object> v) throws Exception {
  6. return v;
  7. }
  8. });
  9. }
  10. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void flatMapFunctionReturnsNull() {
  3. just1.flatMap(new Function<Integer, Publisher<Object>>() {
  4. @Override
  5. public Publisher<Object> apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Movie> apply(Flowable<List<Movie>> movieList) {
  3. return movieList
  4. .startWith(new ArrayList<Movie>())
  5. .buffer(2, 1)
  6. .skip(1)
  7. .flatMap(calculateDelta);
  8. }
  9. };

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testFlatMapTransformsOnCompletedFuncThrows() {
  3. Flowable<Integer> onNext = Flowable.fromIterable(Arrays.asList(1, 2, 3));
  4. Flowable<Integer> onComplete = Flowable.fromIterable(Arrays.asList(4));
  5. Flowable<Integer> onError = Flowable.fromIterable(Arrays.asList(5));
  6. Flowable<Integer> source = Flowable.fromIterable(Arrays.<Integer> asList());
  7. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  8. source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(subscriber);
  9. verify(subscriber).onError(any(TestException.class));
  10. verify(subscriber, never()).onNext(any());
  11. verify(subscriber, never()).onComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testFlatMapTransformsMergeException() {
  3. Flowable<Integer> onNext = Flowable.error(new TestException());
  4. Flowable<Integer> onComplete = Flowable.fromIterable(Arrays.asList(4));
  5. Flowable<Integer> onError = Flowable.fromIterable(Arrays.asList(5));
  6. Flowable<Integer> source = Flowable.fromIterable(Arrays.asList(10, 20, 30));
  7. Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  8. source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(subscriber);
  9. verify(subscriber).onError(any(TestException.class));
  10. verify(subscriber, never()).onNext(any());
  11. verify(subscriber, never()).onComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void flatMapEmpty() {
  3. assertSame(Flowable.empty(), Flowable.empty().flatMap(new Function<Object, Publisher<Object>>() {
  4. @Override
  5. public Publisher<Object> apply(Object v) throws Exception {
  6. return Flowable.just(v);
  7. }
  8. }));
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timespanTimeskipCustomSchedulerBufferSize() {
  3. Flowable.range(1, 10)
  4. .window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
  5. .flatMap(Functions.<Flowable<Integer>>identity())
  6. .test()
  7. .awaitDone(5, TimeUnit.SECONDS)
  8. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerEscapeCompleted() {
  3. Flowable<Integer> source = Flowable.just(0);
  4. Flowable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  5. TestSubscriber<Object> ts = new TestSubscriber<Object>();
  6. m.subscribe(ts);
  7. ts.awaitTerminalEvent();
  8. ts.assertNoErrors();
  9. System.out.println(ts.values());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayErrorSimpleComplete() {
  3. Flowable.just(1)
  4. .groupBy(Functions.justFunction(1), true)
  5. .flatMap(Functions.<Flowable<Integer>>identity())
  6. .test()
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timespanTimeskipDefaultScheduler() {
  3. Flowable.just(1)
  4. .window(1, 1, TimeUnit.MINUTES)
  5. .flatMap(Functions.<Flowable<Integer>>identity())
  6. .test()
  7. .awaitDone(5, TimeUnit.SECONDS)
  8. .assertResult(1);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void restartTimer() {
  3. Flowable.range(1, 5)
  4. .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
  5. .flatMap(Functions.<Flowable<Integer>>identity())
  6. .test()
  7. .assertResult(1, 2, 3, 4, 5);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timespanDefaultSchedulerSize() {
  3. Flowable.range(1, 10)
  4. .window(1, TimeUnit.MINUTES, 20)
  5. .flatMap(Functions.<Flowable<Integer>>identity())
  6. .test()
  7. .awaitDone(5, TimeUnit.SECONDS)
  8. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timespanDefaultSchedulerSizeRestart() {
  3. Flowable.range(1, 10)
  4. .window(1, TimeUnit.MINUTES, 20, true)
  5. .flatMap(Functions.<Flowable<Integer>>identity(), true)
  6. .test()
  7. .awaitDone(5, TimeUnit.SECONDS)
  8. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void boundaryOnError() {
  3. TestSubscriber<Object> ts = Flowable.error(new TestException())
  4. .window(Flowable.never())
  5. .flatMap(Functions.<Flowable<Object>>identity(), true)
  6. .test()
  7. .assertFailure(CompositeException.class);
  8. List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  9. TestHelper.assertError(errors, 0, TestException.class);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "rawtypes", "unchecked" })
  2. @Override
  3. public Publisher<List<Long>> createPublisher(long elements) {
  4. return
  5. Flowable.fromIterable(iterate(elements))
  6. .window(Flowable.just(1).concatWith(Flowable.<Integer>never()))
  7. .onBackpressureBuffer()
  8. .flatMap((Function)Functions.identity())
  9. ;
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void flatMapJustRange() {
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.just(Flowable.range(1, 5)).flatMap((Function)Functions.identity()).subscribe(ts);
  6. ts.assertValues(1, 2, 3, 4, 5);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void flatMapMaxConcurrentJustJust() {
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity(), 5).subscribe(ts);
  6. ts.assertValue(1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void flatMapJustJust() {
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity()).subscribe(ts);
  6. ts.assertValue(1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void keySelectorThrows() {
  3. Flowable<Integer> source = Flowable.just(0, 1, 2, 3, 4, 5, 6);
  4. Flowable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
  5. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  6. m.subscribe(ts);
  7. ts.awaitTerminalEvent();
  8. assertEquals(1, ts.errorCount());
  9. ts.assertNoValues();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void valueSelectorThrows() {
  3. Flowable<Integer> source = Flowable.just(0, 1, 2, 3, 4, 5, 6);
  4. Flowable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
  5. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  6. m.subscribe(ts);
  7. ts.awaitTerminalEvent();
  8. assertEquals(1, ts.errorCount());
  9. ts.assertNoValues();
  10. }

相关文章

Flowable类方法