[英]Returns a Flowable that emits the items emitted by the source Publisher or the items of an alternate Publisher if the source Publisher is empty.
Backpressure: If the source Publisher is empty, the alternate Publisher is expected to honor backpressure. If the source Publisher is non-empty, it is expected to honor backpressure as instead. In either case, if violated, a MissingBackpressureException may get signaled somewhere downstream. Scheduler: switchIfEmpty does not operate by default on a particular Scheduler.
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void switchIfEmptyNull() {
代码示例来源:origin: ReactiveX/RxJava
public Publisher<Integer> createPublisher(long elements) {
Flowable.<Integer>empty().switchIfEmpty(Flowable.range(1, (int)elements))
代码示例来源:origin: ReactiveX/RxJava
public void testSwitchWhenEmpty() throws Exception {
final Flowable<Integer> flowable = Flowable.<Integer>empty()
assertEquals(42, flowable.blockingSingle().intValue());
代码示例来源:origin: ReactiveX/RxJava
public void testSwitchWhenNotEmpty() throws Exception {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final Flowable<Integer> flowable = Flowable.just(4)
.doOnSubscribe(new Consumer<Subscription>() {
public void accept(Subscription s) {
assertEquals(4, flowable.blockingSingle().intValue());
代码示例来源:origin: ReactiveX/RxJava
* Returns a Flowable that emits the items emitted by the source Publisher or a specified default item
* if the source Publisher is empty.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>If the source {@code Publisher} is empty, this operator is guaranteed to honor backpressure from downstream.
* If the source {@code Publisher} is non-empty, it is expected to honor backpressure as well; if the rule is violated,
* a {@code MissingBackpressureException} <em>may</em> get signaled somewhere downstream.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultItem
* the item to emit if the source Publisher emits no items
* @return a Flowable that emits either the specified default item if the source Publisher emits no
* items, or the items emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
public final Flowable<T> defaultIfEmpty(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "item is null");
return switchIfEmpty(just(defaultItem));
代码示例来源:origin: ReactiveX/RxJava
public void testSwitchWithProducer() throws Exception {
final AtomicBoolean emitted = new AtomicBoolean(false);
Flowable<Long> withProducer = Flowable.unsafeCreate(new Publisher<Long>() {
public void subscribe(final Subscriber<? super Long> subscriber) {
subscriber.onSubscribe(new Subscription() {
public void request(long n) {
if (n > 0 && emitted.compareAndSet(false, true)) {
public void cancel() {
final Flowable<Long> flowable = Flowable.<Long>empty().switchIfEmpty(withProducer);
assertEquals(42, flowable.blockingSingle().intValue());
代码示例来源:origin: redisson/redisson
* Returns a Flowable that emits the items emitted by the source Publisher or a specified default item
* if the source Publisher is empty.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>If the source {@code Publisher} is empty, this operator is guaranteed to honor backpressure from downstream.
* If the source {@code Publisher} is non-empty, it is expected to honor backpressure as well; if the rule is violated,
* a {@code MissingBackpressureException} <em>may</em> get signaled somewhere downstream.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultItem
* the item to emit if the source Publisher emits no items
* @return a Flowable that emits either the specified default item if the source Publisher emits no
* items, or the items emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
public final Flowable<T> defaultIfEmpty(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "item is null");
return switchIfEmpty(just(defaultItem));
代码示例来源:origin: micronaut-projects/micronaut-core
).switchIfEmpty(Flowable.error(new FunctionNotFoundException(functionName)));
return serviceInstanceLocator.map(instance -> {
Optional<String> uri = instance.getMetadata().get(LocalFunctionRegistry.FUNCTION_PREFIX + functionName, String.class);
代码示例来源:origin: ReactiveX/RxJava
public void testBackpressureNoRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L);
Flowable.<Integer>empty().switchIfEmpty(Flowable.just(1, 2, 3)).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
public void testSwitchShouldNotTriggerUnsubscribe() {
final BooleanSubscription bs = new BooleanSubscription();
Flowable.unsafeCreate(new Publisher<Long>() {
public void subscribe(final Subscriber<? super Long> subscriber) {
代码示例来源:origin: micronaut-projects/micronaut-core
routePublisher = routePublisher.switchIfEmpty(Flowable.create((emitter) -> {
HttpRequest<?> httpRequest = requestReference.get();
MutableHttpResponse<?> response;
代码示例来源:origin: ReactiveX/RxJava
public void testSwitchRequestAlternativeObservableWithBackpressure() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(1L);
Flowable.<Integer>empty().switchIfEmpty(Flowable.just(1, 2, 3)).subscribe(ts);
assertEquals(Arrays.asList(1), ts.values());
代码示例来源:origin: ReactiveX/RxJava
public void testBackpressureOnFirstObservable() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L);
Flowable.just(1, 2, 3).switchIfEmpty(Flowable.just(4, 5, 6)).subscribe(ts);
代码示例来源:origin: micronaut-projects/micronaut-core
setBodyContent(response, bodyContent)
return bodyToResponse.switchIfEmpty(Flowable.just(response));
代码示例来源:origin: ReactiveX/RxJava
.switchIfEmpty(Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)))
代码示例来源:origin: ReactiveX/RxJava
.lift(new FlowableOperator<Long, Long>() {
代码示例来源:origin: fengzhizi715/RxCache
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
return cache.switchIfEmpty(remote);
代码示例来源:origin: fengzhizi715/RxCache
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
return remote.switchIfEmpty(cache);
代码示例来源:origin: fengzhizi715/RxCache
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type)
.filter(new Predicate<Record<T>>() {
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
return cache.switchIfEmpty(remote);