本文整理了Java中io.reactivex.Flowable.switchMap()
方法的一些代码示例,展示了Flowable.switchMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.switchMap()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:switchMap
[英]Returns a new Publisher by applying a function that you supply to each item emitted by the source Publisher that returns a Publisher, and then emitting the items emitted by the most recently emitted of these Publishers.
The resulting Publisher completes if both the upstream Publisher and the last inner Publisher, if any, complete. If the upstream Publisher signals an onError, the inner Publisher is canceled and the error delivered in-sequence.
Backpressure: The operator honors backpressure from downstream. The outer Publisher is consumed in an unbounded manner (i.e., without backpressure) and the inner Publishers are expected to honor backpressure but it is not enforced; the operator won't signal a MissingBackpressureExceptionbut the violation may lead to OutOfMemoryError due to internal buffer bloat. Scheduler: switchMap does not operate by default on a particular Scheduler.
[中]通过将您提供的函数应用于返回发布服务器的源发布服务器发出的每个项目,然后发出这些发布服务器中最近发出的项目,来返回新发布服务器。
如果上游发布服务器和最后一个内部发布服务器(如果有)都已完成,则生成的发布服务器将完成。如果上游发布服务器发出onError信号,则取消内部发布服务器,并按顺序传递错误。
背压:操作员接受来自下游的背压。外部发布服务器以无限制的方式使用(即,没有背压),内部发布服务器应遵守背压,但不强制执行;操作员不会发出MissingBackPressureException的信号,但由于内部缓冲区膨胀,违规可能导致OutOfMemoryError。调度器:默认情况下,switchMap不会在特定调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> completions) throws Exception {
return completions.switchMap(new Function<Object, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Object ignore) throws Exception {
return processor;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Throwable> errors) throws Exception {
return errors.switchMap(new Function<Throwable, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Throwable ignore) throws Exception {
return processor;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void switchMapNull() {
just1.switchMap(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void switchMapFunctionReturnsNull() {
just1.switchMap(new Function<Integer, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Integer> f) throws Exception {
return f.switchMap(Functions.justFunction(Flowable.just(1)));
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void switchMapErrorEmptySource() {
assertSame(Flowable.empty(), Flowable.<Object>empty()
.switchMap(new Function<Object, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Object v) throws Exception {
return Flowable.just(1);
}
}, 16));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void switchMapJustSource() {
Flowable.just(0)
.switchMap(new Function<Object, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Object v) throws Exception {
return Flowable.just(1);
}
}, 16)
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.just(1).switchMap(Functions.justFunction(
Flowable.fromIterable(iterate(elements)))
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapperThrows() {
Flowable.just(1).hide()
.switchMap(new Function<Integer, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedBoundary() {
String thread = Thread.currentThread().getName();
Flowable.range(1, 10000)
.switchMap(new Function<Integer, Flowable<? extends Object>>() {
@Override
public Flowable<? extends Object> apply(Integer v)
throws Exception {
return Flowable.just(2).hide()
.observeOn(Schedulers.single())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
return Thread.currentThread().getName();
}
});
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNever(thread)
.assertNoErrors()
.assertComplete();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyInner() {
Flowable.range(1, 5)
.switchMap(Functions.justFunction(Flowable.empty()))
.test()
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void justInner() {
Flowable.range(1, 5)
.switchMap(Functions.justFunction(Flowable.just(1)))
.test()
.assertResult(1, 1, 1, 1, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerOverflow() {
Flowable.just(1).hide()
.switchMap(Functions.justFunction(new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
for (int i = 0; i < 10; i++) {
s.onNext(i);
}
}
}), 8)
.test(1L)
.assertFailure(MissingBackpressureException.class, 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void scalarXMap() {
Flowable.fromCallable(Functions.justCallable(1))
.switchMap(Functions.justFunction(Flowable.just(1)))
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerErrorsReentrant() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
pp.onError(new TestException());
}
};
Flowable.just(1).hide()
.switchMap(Functions.justFunction(pp))
.subscribe(ts);
pp.onNext(1);
ts.assertFailure(TestException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerCompletesReentrant() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
pp.onComplete();
}
};
Flowable.just(1).hide()
.switchMap(Functions.justFunction(pp))
.subscribe(ts);
pp.onNext(1);
ts.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void switchMapInnerCancelled() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = Flowable.just(1)
.switchMap(Functions.justFunction(pp))
.test();
assertTrue(pp.hasSubscribers());
ts.cancel();
assertFalse(pp.hasSubscribers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void drainCancelRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable.just(1).hide()
.switchMap(Functions.justFunction(pp))
.subscribe(ts);
Runnable r1 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
pp.onNext(1);
}
};
TestHelper.race(r1, r2);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedInnerCrash() {
Flowable.just(1).hide()
.switchMap(Functions.justFunction(Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Object>flowableStripBoundary())
)
)
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badInnerSource() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1).hide()
.switchMap(Functions.justFunction(new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
subscriber.onError(new TestException());
subscriber.onComplete();
subscriber.onError(new TestException());
subscriber.onComplete();
}
}))
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!