本文整理了Java中io.reactivex.Flowable.flatMap()
方法的一些代码示例,展示了Flowable.flatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.flatMap()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:flatMap
[英]Returns a Flowable that emits items based on applying a function that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then merging those resulting Publishers and emitting the results of this merger.
Backpressure: The operator honors backpressure from downstream. The upstream Flowable is consumed in a bounded manner (up to #bufferSize() outstanding request amount for items). The inner Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: flatMap does not operate by default on a particular Scheduler.
[中]返回一个可流动的项目,该项目基于将您提供的函数应用于源发布服务器发出的每个项目,其中该函数返回发布服务器,然后合并这些结果发布服务器并发出此合并的结果。
背压:操作员接受来自下游的背压。上游可流动项以有限制的方式使用(最多为#bufferSize()项的未完成请求量)。内部出版商应尊重背压;如果违反,操作员可能发出信号缺失背压异常。调度程序:默认情况下,flatMap不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Object> f) throws Exception {
return f.window(Flowable.never()).flatMap(new Function<Flowable<Object>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Object> v) throws Exception {
return v;
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void flatMapFunctionReturnsNull() {
just1.flatMap(new Function<Integer, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Movie> apply(Flowable<List<Movie>> movieList) {
return movieList
.startWith(new ArrayList<Movie>())
.buffer(2, 1)
.skip(1)
.flatMap(calculateDelta);
}
};
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsOnCompletedFuncThrows() {
Flowable<Integer> onNext = Flowable.fromIterable(Arrays.asList(1, 2, 3));
Flowable<Integer> onComplete = Flowable.fromIterable(Arrays.asList(4));
Flowable<Integer> onError = Flowable.fromIterable(Arrays.asList(5));
Flowable<Integer> source = Flowable.fromIterable(Arrays.<Integer> asList());
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(subscriber);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsMergeException() {
Flowable<Integer> onNext = Flowable.error(new TestException());
Flowable<Integer> onComplete = Flowable.fromIterable(Arrays.asList(4));
Flowable<Integer> onError = Flowable.fromIterable(Arrays.asList(5));
Flowable<Integer> source = Flowable.fromIterable(Arrays.asList(10, 20, 30));
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(subscriber);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void flatMapEmpty() {
assertSame(Flowable.empty(), Flowable.empty().flatMap(new Function<Object, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Object v) throws Exception {
return Flowable.just(v);
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanTimeskipCustomSchedulerBufferSize() {
Flowable.range(1, 10)
.window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
.flatMap(Functions.<Flowable<Integer>>identity())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerEscapeCompleted() {
Flowable<Integer> source = Flowable.just(0);
Flowable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
TestSubscriber<Object> ts = new TestSubscriber<Object>();
m.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
System.out.println(ts.values());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayErrorSimpleComplete() {
Flowable.just(1)
.groupBy(Functions.justFunction(1), true)
.flatMap(Functions.<Flowable<Integer>>identity())
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanTimeskipDefaultScheduler() {
Flowable.just(1)
.window(1, 1, TimeUnit.MINUTES)
.flatMap(Functions.<Flowable<Integer>>identity())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void restartTimer() {
Flowable.range(1, 5)
.window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
.flatMap(Functions.<Flowable<Integer>>identity())
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanDefaultSchedulerSize() {
Flowable.range(1, 10)
.window(1, TimeUnit.MINUTES, 20)
.flatMap(Functions.<Flowable<Integer>>identity())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timespanDefaultSchedulerSizeRestart() {
Flowable.range(1, 10)
.window(1, TimeUnit.MINUTES, 20, true)
.flatMap(Functions.<Flowable<Integer>>identity(), true)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void boundaryOnError() {
TestSubscriber<Object> ts = Flowable.error(new TestException())
.window(Flowable.never())
.flatMap(Functions.<Flowable<Object>>identity(), true)
.test()
.assertFailure(CompositeException.class);
List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Publisher<List<Long>> createPublisher(long elements) {
return
Flowable.fromIterable(iterate(elements))
.window(Flowable.just(1).concatWith(Flowable.<Integer>never()))
.onBackpressureBuffer()
.flatMap((Function)Functions.identity())
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void flatMapJustRange() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(Flowable.range(1, 5)).flatMap((Function)Functions.identity()).subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void flatMapMaxConcurrentJustJust() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity(), 5).subscribe(ts);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void flatMapJustJust() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity()).subscribe(ts);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void keySelectorThrows() {
Flowable<Integer> source = Flowable.just(0, 1, 2, 3, 4, 5, 6);
Flowable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
m.subscribe(ts);
ts.awaitTerminalEvent();
assertEquals(1, ts.errorCount());
ts.assertNoValues();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void valueSelectorThrows() {
Flowable<Integer> source = Flowable.just(0, 1, 2, 3, 4, 5, 6);
Flowable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
m.subscribe(ts);
ts.awaitTerminalEvent();
assertEquals(1, ts.errorCount());
ts.assertNoValues();
}
内容来源于网络,如有侵权,请联系作者删除!