本文整理了Java中io.reactivex.Observable.flatMap()
方法的一些代码示例,展示了Observable.flatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMap()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMap
[英]Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting ObservableSources and emitting the results of this merger.
Scheduler: flatMap does not operate by default on a particular Scheduler.
[中]返回一个发出项目的可观察对象,该可观察对象基于将您提供的函数应用于源ObservableSource发出的每个项目,其中该函数返回一个ObservableSource,然后合并这些产生的ObservableSource并发出此合并的结果。
调度程序:默认情况下,flatMap不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapCombinerMapperNull() {
just1.flatMap(null, new BiFunction<Integer, Object, Object>() {
@Override
public Object apply(Integer a, Object b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Object> o) throws Exception {
return o.window(Observable.never()).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
return v;
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Movie> apply(Observable<List<Movie>> movieList) {
return movieList
.startWith(new ArrayList<Movie>())
.buffer(2, 1)
.skip(1)
.flatMap(calculateDelta);
}
};
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsOnErrorFuncThrows() {
Observable<Integer> onNext = Observable.fromIterable(Arrays.asList(1, 2, 3));
Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
Observable<Integer> source = Observable.error(new TestException());
Observer<Object> o = TestHelper.mockObserver();
source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o);
verify(o).onError(any(CompositeException.class));
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsMergeException() {
Observable<Integer> onNext = Observable.error(new TestException());
Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
Observable<Integer> source = Observable.fromIterable(Arrays.asList(10, 20, 30));
Observer<Object> o = TestHelper.mockObserver();
source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(o);
verify(o).onError(any(TestException.class));
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Object> o) throws Exception {
return o.window(Functions.justCallable(Observable.never())).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
return v;
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerEscapeCompleted() {
Observable<Integer> source = Observable.just(0);
Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
TestObserver<Object> to = new TestObserver<Object>();
m.subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
System.out.println(to.values());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanTimeskipCustomScheduler() {
Observable.just(1)
.window(1, 1, TimeUnit.MINUTES, Schedulers.io())
.flatMap(Functions.<Observable<Integer>>identity())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstreamObservable() {
Observable<Integer> source = Observable.just(1).isEmpty().toObservable()
.flatMap(new Function<Boolean, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Boolean t1) {
return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
}
});
assertEquals((Object)2, source.blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayErrorSimpleComplete() {
Observable.just(1)
.groupBy(Functions.justFunction(1), true)
.flatMap(Functions.<Observable<Integer>>identity())
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanTimeskipDefaultScheduler() {
Observable.just(1)
.window(1, 1, TimeUnit.MINUTES)
.flatMap(Functions.<Observable<Integer>>identity())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void valueSelectorThrows() {
Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
Observable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
TestObserver<Integer> to = new TestObserver<Integer>();
m.subscribe(to);
to.awaitTerminalEvent();
assertEquals(1, to.errorCount());
to.assertNoValues();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void keySelectorThrows() {
Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
Observable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
TestObserver<Integer> to = new TestObserver<Integer>();
m.subscribe(to);
to.awaitTerminalEvent();
assertEquals(1, to.errorCount());
to.assertNoValues();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanTimeskipCustomSchedulerBufferSize() {
Observable.range(1, 10)
.window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
.flatMap(Functions.<Observable<Integer>>identity())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanDefaultSchedulerSizeRestart() {
Observable.range(1, 10)
.window(1, TimeUnit.MINUTES, 20, true)
.flatMap(Functions.<Observable<Integer>>identity(), true)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void restartTimer() {
Observable.range(1, 5)
.window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
.flatMap(Functions.<Observable<Integer>>identity())
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void boundaryOnError() {
TestObserver<Object> to = Observable.error(new TestException())
.window(Observable.never())
.flatMap(Functions.<Observable<Object>>identity(), true)
.test()
.assertFailure(CompositeException.class);
List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void exactOnError() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps.window(1, 1, TimeUnit.SECONDS, scheduler)
.flatMap(Functions.<Observable<Integer>>identity())
.test();
ps.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void overlappingOnError() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps.window(2, 1, TimeUnit.SECONDS, scheduler)
.flatMap(Functions.<Observable<Integer>>identity())
.test();
ps.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void skipOnError() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps.window(1, 2, TimeUnit.SECONDS, scheduler)
.flatMap(Functions.<Observable<Integer>>identity())
.test();
ps.onError(new TestException());
to.assertFailure(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!