io.reactivex.Flowable.switchIfEmpty()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(212)

本文整理了Java中io.reactivex.Flowable.switchIfEmpty()方法的一些代码示例,展示了Flowable.switchIfEmpty()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.switchIfEmpty()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:switchIfEmpty

Flowable.switchIfEmpty介绍

[英]Returns a Flowable that emits the items emitted by the source Publisher or the items of an alternate Publisher if the source Publisher is empty.

Backpressure: If the source Publisher is empty, the alternate Publisher is expected to honor backpressure. If the source Publisher is non-empty, it is expected to honor backpressure as instead. In either case, if violated, a MissingBackpressureException may get signaled somewhere downstream. Scheduler: switchIfEmpty does not operate by default on a particular Scheduler.
[中]返回一个可流动项,该可流动项发出源发布服务器发出的项目,或者如果源发布服务器为空,则发出备用发布服务器的项目。
背压:如果源发布服务器为空,则备用发布服务器应接受背压。如果源发布服务器为非空,则应将背压作为默认值。在任何一种情况下,如果违反,可能会在下游某处发出缺失背压异常*的信号。Scheduler:switchIfEmpty默认情况下不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void switchIfEmptyNull() {
  3. just1.switchIfEmpty(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.<Integer>empty().switchIfEmpty(Flowable.range(1, (int)elements))
  5. ;
  6. }
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSwitchWhenEmpty() throws Exception {
  3. final Flowable<Integer> flowable = Flowable.<Integer>empty()
  4. .switchIfEmpty(Flowable.fromIterable(Arrays.asList(42)));
  5. assertEquals(42, flowable.blockingSingle().intValue());
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSwitchWhenNotEmpty() throws Exception {
  3. final AtomicBoolean subscribed = new AtomicBoolean(false);
  4. final Flowable<Integer> flowable = Flowable.just(4)
  5. .switchIfEmpty(Flowable.just(2)
  6. .doOnSubscribe(new Consumer<Subscription>() {
  7. @Override
  8. public void accept(Subscription s) {
  9. subscribed.set(true);
  10. }
  11. }));
  12. assertEquals(4, flowable.blockingSingle().intValue());
  13. assertFalse(subscribed.get());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns a Flowable that emits the items emitted by the source Publisher or a specified default item
  3. * if the source Publisher is empty.
  4. * <p>
  5. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
  6. * <dl>
  7. * <dt><b>Backpressure:</b></dt>
  8. * <dd>If the source {@code Publisher} is empty, this operator is guaranteed to honor backpressure from downstream.
  9. * If the source {@code Publisher} is non-empty, it is expected to honor backpressure as well; if the rule is violated,
  10. * a {@code MissingBackpressureException} <em>may</em> get signaled somewhere downstream.
  11. * </dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. *
  16. * @param defaultItem
  17. * the item to emit if the source Publisher emits no items
  18. * @return a Flowable that emits either the specified default item if the source Publisher emits no
  19. * items, or the items emitted by the source Publisher
  20. * @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
  21. */
  22. @CheckReturnValue
  23. @NonNull
  24. @BackpressureSupport(BackpressureKind.FULL)
  25. @SchedulerSupport(SchedulerSupport.NONE)
  26. public final Flowable<T> defaultIfEmpty(T defaultItem) {
  27. ObjectHelper.requireNonNull(defaultItem, "item is null");
  28. return switchIfEmpty(just(defaultItem));
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSwitchWithProducer() throws Exception {
  3. final AtomicBoolean emitted = new AtomicBoolean(false);
  4. Flowable<Long> withProducer = Flowable.unsafeCreate(new Publisher<Long>() {
  5. @Override
  6. public void subscribe(final Subscriber<? super Long> subscriber) {
  7. subscriber.onSubscribe(new Subscription() {
  8. @Override
  9. public void request(long n) {
  10. if (n > 0 && emitted.compareAndSet(false, true)) {
  11. emitted.set(true);
  12. subscriber.onNext(42L);
  13. subscriber.onComplete();
  14. }
  15. }
  16. @Override
  17. public void cancel() {
  18. }
  19. });
  20. }
  21. });
  22. final Flowable<Long> flowable = Flowable.<Long>empty().switchIfEmpty(withProducer);
  23. assertEquals(42, flowable.blockingSingle().intValue());
  24. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Returns a Flowable that emits the items emitted by the source Publisher or a specified default item
  3. * if the source Publisher is empty.
  4. * <p>
  5. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
  6. * <dl>
  7. * <dt><b>Backpressure:</b></dt>
  8. * <dd>If the source {@code Publisher} is empty, this operator is guaranteed to honor backpressure from downstream.
  9. * If the source {@code Publisher} is non-empty, it is expected to honor backpressure as well; if the rule is violated,
  10. * a {@code MissingBackpressureException} <em>may</em> get signaled somewhere downstream.
  11. * </dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. *
  16. * @param defaultItem
  17. * the item to emit if the source Publisher emits no items
  18. * @return a Flowable that emits either the specified default item if the source Publisher emits no
  19. * items, or the items emitted by the source Publisher
  20. * @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
  21. */
  22. @CheckReturnValue
  23. @BackpressureSupport(BackpressureKind.FULL)
  24. @SchedulerSupport(SchedulerSupport.NONE)
  25. public final Flowable<T> defaultIfEmpty(T defaultItem) {
  26. ObjectHelper.requireNonNull(defaultItem, "item is null");
  27. return switchIfEmpty(just(defaultItem));
  28. }

代码示例来源:origin: micronaut-projects/micronaut-core

  1. ).switchIfEmpty(Flowable.error(new FunctionNotFoundException(functionName)));
  2. return serviceInstanceLocator.map(instance -> {
  3. Optional<String> uri = instance.getMetadata().get(LocalFunctionRegistry.FUNCTION_PREFIX + functionName, String.class);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressureNoRequest() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L);
  4. Flowable.<Integer>empty().switchIfEmpty(Flowable.just(1, 2, 3)).subscribe(ts);
  5. ts.assertNoValues();
  6. ts.assertNoErrors();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSwitchShouldNotTriggerUnsubscribe() {
  3. final BooleanSubscription bs = new BooleanSubscription();
  4. Flowable.unsafeCreate(new Publisher<Long>() {
  5. @Override
  6. public void subscribe(final Subscriber<? super Long> subscriber) {
  7. subscriber.onSubscribe(bs);
  8. subscriber.onComplete();
  9. }
  10. }).switchIfEmpty(Flowable.<Long>never()).subscribe();
  11. assertFalse(bs.isCancelled());
  12. }

代码示例来源:origin: micronaut-projects/micronaut-core

  1. routePublisher = routePublisher.switchIfEmpty(Flowable.create((emitter) -> {
  2. HttpRequest<?> httpRequest = requestReference.get();
  3. MutableHttpResponse<?> response;

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testSwitchRequestAlternativeObservableWithBackpressure() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>(1L);
  4. Flowable.<Integer>empty().switchIfEmpty(Flowable.just(1, 2, 3)).subscribe(ts);
  5. assertEquals(Arrays.asList(1), ts.values());
  6. ts.assertNoErrors();
  7. ts.request(1);
  8. ts.assertValueCount(2);
  9. ts.request(1);
  10. ts.assertValueCount(3);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressureOnFirstObservable() {
  3. TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L);
  4. Flowable.just(1, 2, 3).switchIfEmpty(Flowable.just(4, 5, 6)).subscribe(ts);
  5. ts.assertNotComplete();
  6. ts.assertNoErrors();
  7. ts.assertNoValues();
  8. }

代码示例来源:origin: micronaut-projects/micronaut-core

  1. setBodyContent(response, bodyContent)
  2. );
  3. return bodyToResponse.switchIfEmpty(Flowable.just(response));

代码示例来源:origin: ReactiveX/RxJava

  1. .switchIfEmpty(Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)))
  2. .subscribeOn(Schedulers.computation())
  3. .subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

  1. .switchIfEmpty(withProducer)
  2. .lift(new FlowableOperator<Long, Long>() {
  3. @Override

代码示例来源:origin: fengzhizi715/RxCache

  1. @Override
  2. public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
  3. Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
  4. Flowable<Record<T>> remote = source
  5. .map(new Function<T, Record<T>>() {
  6. @Override
  7. public Record<T> apply(@NonNull T t) throws Exception {
  8. rxCache.save(key, t);
  9. return new Record<>(Source.CLOUD, key, t);
  10. }
  11. });
  12. return cache.switchIfEmpty(remote);
  13. }

代码示例来源:origin: fengzhizi715/RxCache

  1. @Override
  2. public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
  3. Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
  4. Flowable<Record<T>> remote = source
  5. .map(new Function<T, Record<T>>() {
  6. @Override
  7. public Record<T> apply(@NonNull T t) throws Exception {
  8. rxCache.save(key, t);
  9. return new Record<>(Source.CLOUD, key, t);
  10. }
  11. });
  12. return remote.switchIfEmpty(cache);
  13. }

代码示例来源:origin: fengzhizi715/RxCache

  1. @Override
  2. public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
  3. Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type)
  4. .filter(new Predicate<Record<T>>() {
  5. @Override
  6. public boolean test(Record<T> record) throws Exception {
  7. return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
  8. }
  9. });
  10. Flowable<Record<T>> remote = source
  11. .map(new Function<T, Record<T>>() {
  12. @Override
  13. public Record<T> apply(@NonNull T t) throws Exception {
  14. rxCache.save(key, t);
  15. return new Record<>(Source.CLOUD, key, t);
  16. }
  17. });
  18. return cache.switchIfEmpty(remote);
  19. }

相关文章

Flowable类方法