io.reactivex.Observable.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(189)

本文整理了Java中io.reactivex.Observable.flatMap()方法的一些代码示例,展示了Observable.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMap()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMap

Observable.flatMap介绍

[英]Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting ObservableSources and emitting the results of this merger.

Scheduler: flatMap does not operate by default on a particular Scheduler.
[中]返回一个发出项目的可观察对象,该可观察对象基于将您提供的函数应用于源ObservableSource发出的每个项目,其中该函数返回一个ObservableSource,然后合并这些产生的ObservableSource并发出此合并的结果。
调度程序:默认情况下,flatMap不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapCombinerMapperNull() {
  just1.flatMap(null, new BiFunction<Integer, Object, Object>() {
    @Override
    public Object apply(Integer a, Object b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Object> o) throws Exception {
    return o.window(Observable.never()).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
      @Override
      public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Movie> apply(Observable<List<Movie>> movieList) {
    return movieList
      .startWith(new ArrayList<Movie>())
      .buffer(2, 1)
      .skip(1)
      .flatMap(calculateDelta);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFlatMapTransformsOnErrorFuncThrows() {
  Observable<Integer> onNext = Observable.fromIterable(Arrays.asList(1, 2, 3));
  Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
  Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
  Observable<Integer> source = Observable.error(new TestException());
  Observer<Object> o = TestHelper.mockObserver();
  source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o);
  verify(o).onError(any(CompositeException.class));
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFlatMapTransformsMergeException() {
  Observable<Integer> onNext = Observable.error(new TestException());
  Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
  Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
  Observable<Integer> source = Observable.fromIterable(Arrays.asList(10, 20, 30));
  Observer<Object> o = TestHelper.mockObserver();
  source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(o);
  verify(o).onError(any(TestException.class));
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Object> o) throws Exception {
    return o.window(Functions.justCallable(Observable.never())).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
      @Override
      public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerEscapeCompleted() {
  Observable<Integer> source = Observable.just(0);
  Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Object> to = new TestObserver<Object>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  System.out.println(to.values());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipCustomScheduler() {
  Observable.just(1)
  .window(1, 1, TimeUnit.MINUTES, Schedulers.io())
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstreamObservable() {
  Observable<Integer> source = Observable.just(1).isEmpty().toObservable()
    .flatMap(new Function<Boolean, Observable<Integer>>() {
      @Override
      public Observable<Integer> apply(Boolean t1) {
        return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
      }
    });
  assertEquals((Object)2, source.blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void delayErrorSimpleComplete() {
    Observable.just(1)
    .groupBy(Functions.justFunction(1), true)
    .flatMap(Functions.<Observable<Integer>>identity())
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipDefaultScheduler() {
  Observable.just(1)
  .window(1, 1, TimeUnit.MINUTES)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void valueSelectorThrows() {
  Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
  Observable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
  TestObserver<Integer> to = new TestObserver<Integer>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void keySelectorThrows() {
  Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
  Observable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Integer> to = new TestObserver<Integer>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipCustomSchedulerBufferSize() {
  Observable.range(1, 10)
  .window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanDefaultSchedulerSizeRestart() {
  Observable.range(1, 10)
  .window(1, TimeUnit.MINUTES, 20, true)
  .flatMap(Functions.<Observable<Integer>>identity(), true)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void restartTimer() {
  Observable.range(1, 5)
  .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void boundaryOnError() {
  TestObserver<Object> to = Observable.error(new TestException())
  .window(Observable.never())
  .flatMap(Functions.<Observable<Object>>identity(), true)
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void exactOnError() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> ps = PublishSubject.create();
  TestObserver<Integer> to = ps.window(1, 1, TimeUnit.SECONDS, scheduler)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test();
  ps.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overlappingOnError() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> ps = PublishSubject.create();
  TestObserver<Integer> to = ps.window(2, 1, TimeUnit.SECONDS, scheduler)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test();
  ps.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void skipOnError() {
  TestScheduler scheduler = new TestScheduler();
  PublishSubject<Integer> ps = PublishSubject.create();
  TestObserver<Integer> to = ps.window(1, 2, TimeUnit.SECONDS, scheduler)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test();
  ps.onError(new TestException());
  to.assertFailure(TestException.class);
}

相关文章

Observable类方法