本文整理了Java中io.reactivex.Observable.concatMap()
方法的一些代码示例,展示了Observable.concatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMap()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMap
[英]Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items that result from concatenating those resulting ObservableSources.
Scheduler: concatMap does not operate by default on a particular Scheduler.
[中]返回一个新的Observable,该Observable将向源ObservableSource发出的每个项应用一个函数,该函数将返回一个ObservableSource,然后发送连接这些ObservableSource后产生的项。
调度器:默认情况下,concatMap不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a new Observable that emits items resulting from applying a function that you supply to each item
* emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
* that result from concatenating those resulting ObservableSources.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the type of the inner ObservableSource sources and thus the output type
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns an
* ObservableSource
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void concatMapNull() {
just1.concatMap(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void concatMapReturnsNull() {
just1.concatMap(new Function<Integer, Observable<Object>>() {
@Override
public Observable<Object> apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapJustSource() {
Observable.just(0)
.concatMap(new Function<Object, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Object v) throws Exception {
return Observable.just(1);
}
}, 16)
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.<Integer>just(1).hide()
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.error(new TestException());
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedPollThrows() {
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerError() {
Observable.<Integer>just(1).hide()
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.error(new TestException());
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapperThrows() {
Observable.just(1).hide()
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that concatenate each item emitted by the source ObservableSource with the values in an
* Iterable corresponding to that item that is generated by a selector.
* <p>
* <img width="640" height="275" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapIterable.o.png" alt="">
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of item emitted by the resulting ObservableSource
* @param mapper
* a function that returns an Iterable sequence of values for when given an item emitted by the
* source ObservableSource
* @param prefetch
* the number of elements to prefetch from the current Observable
* @return an Observable that emits the results of concatenating the items emitted by the source ObservableSource with
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), prefetch);
}
代码示例来源:origin: ReactiveX/RxJava
.concatMap(new Function<byte[], Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(byte[] v) throws Exception {
代码示例来源:origin: ReactiveX/RxJava
.concatMap(func).subscribe(new DefaultObserver<Integer>() {
@Override
public void onNext(Integer t) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusionWithConcatMap() {
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.fromIterable(Arrays.asList(1, 2, 3, 4)).concatMap(
new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) {
return Observable.range(v, 2);
}
}).subscribe(to);
to.assertValues(1, 2, 2, 3, 3, 4, 4, 5);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapErrorEmptySource() {
assertSame(Observable.empty(), Observable.<Object>empty()
.concatMap(new Function<Object, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Object v) throws Exception {
return Observable.just(1);
}
}, 16));
}
代码示例来源:origin: ReactiveX/RxJava
/** Issue #2844: wrong target of request. */
@Test(timeout = 3000)
public void testRepeatRetarget() {
final List<Integer> concatBase = new ArrayList<Integer>();
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.just(1, 2)
.repeat(5)
.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer x) {
System.out.println("testRepeatRetarget -> " + x);
concatBase.add(x);
return Observable.<Integer>empty()
.delay(200, TimeUnit.MILLISECONDS);
}
})
.subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
to.assertNoValues();
assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPrevious() {
final AtomicInteger counter = new AtomicInteger();
Observable.range(1, 5)
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.just(v).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
}
})
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(0, counter.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void concatReportsDisposedOnComplete() {
final Disposable[] disposable = { null };
Observable.fromArray(Observable.just(1), Observable.just(2))
.hide()
.concatMap(Functions.<Observable<Integer>>identity())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}
@Override
public void onNext(Integer t) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
assertTrue(disposable[0].isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainError() {
Observable.<Integer>error(new TestException())
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test//(timeout = 100000)
public void concatMapRangeAsyncLoopIssue2876() {
final long durationSeconds = 2;
final long startTime = System.currentTimeMillis();
for (int i = 0;; i++) {
//only run this for a max of ten seconds
if (System.currentTimeMillis() - startTime > TimeUnit.SECONDS.toMillis(durationSeconds)) {
return;
}
if (i % 1000 == 0) {
System.out.println("concatMapRangeAsyncLoop > " + i);
}
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(0, 1000)
.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer t) {
return Observable.fromIterable(Arrays.asList(t));
}
})
.observeOn(Schedulers.computation()).subscribe(to);
to.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS);
to.assertTerminated();
to.assertNoErrors();
assertEquals(1000, to.valueCount());
assertEquals((Integer)999, to.values().get(999));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void outputFusedOneSignal() {
final BehaviorSubject<Integer> bs = BehaviorSubject.createDefault(1);
bs.observeOn(ImmediateThinScheduler.INSTANCE)
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v)
throws Exception {
return Observable.just(v + 1);
}
})
.subscribeWith(new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
if (t == 2) {
bs.onNext(2);
}
}
})
.assertValuesOnly(2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void concatReportsDisposedOnError() {
final Disposable[] disposable = { null };
Observable.fromArray(Observable.just(1), Observable.<Integer>error(new TestException()))
.hide()
.concatMap(Functions.<Observable<Integer>>identity())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}
@Override
public void onNext(Integer t) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
assertTrue(disposable[0].isDisposed());
}
内容来源于网络,如有侵权,请联系作者删除!