io.reactivex.Flowable.concatMapMaybe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(140)

本文整理了Java中io.reactivex.Flowable.concatMapMaybe()方法的一些代码示例,展示了Flowable.concatMapMaybe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatMapMaybe()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatMapMaybe

Flowable.concatMapMaybe介绍

[英]Maps the upstream items into MaybeSources and subscribes to them one after the other succeeds or completes, emits their success value if available or terminates immediately if either this Flowable or the current inner MaybeSource fail.

Backpressure: The operator expects the upstream to support backpressure and honors the backpressure from downstream. If this Flowable violates the rule, the operator will signal a MissingBackpressureException. Scheduler: concatMapMaybe does not operate by default on a particular Scheduler.
[中]将上游项目映射到MaybeSource中,并在其他项目成功或完成后一个接一个地订阅它们,如果可用,则发出它们的成功值,或者如果此可流动或当前内部MaybeSource失败,则立即终止。
背压:操作员希望上游支持背压,并尊重下游的背压。如果该流体违反规则,操作员将发出缺少背压异常的信号。调度器:默认情况下,concatMapMaybe不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements)
        .concatMapMaybe(new Function<Integer, Maybe<Integer>>() {
          @Override
          public Maybe<Integer> apply(Integer v) throws Exception {
            return Maybe.just(v);
          }
        })
      ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the
 * other succeeds or completes, emits their success value if available or terminates immediately if
 * either this {@code Flowable} or the current inner {@code MaybeSource} fail.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator expects the upstream to support backpressure and honors
 *  the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will
 *  signal a {@code MissingBackpressureException}.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * <p>History: 2.1.11 - experimental
 * @param <R> the result type of the inner {@code MaybeSource}s
 * @param mapper the function called with the upstream item and should return
 *               a {@code MaybeSource} to become the next source to
 *               be subscribed to
 * @return a new Flowable instance
 * @see #concatMapMaybeDelayError(Function)
 * @see #concatMapMaybe(Function, int)
 * @since 2.2
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> concatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
  return concatMapMaybe(mapper, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void simple() {
  Flowable.range(1, 5)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  })
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: redisson/redisson

/**
 * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the
 * other succeeds or completes, emits their success value if available or terminates immediately if
 * either this {@code Flowable} or the current inner {@code MaybeSource} fail.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator expects the upstream to support backpressure and honors
 *  the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will
 *  signal a {@code MissingBackpressureException}.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <R> the result type of the inner {@code MaybeSource}s
 * @param mapper the function called with the upstream item and should return
 *               a {@code MaybeSource} to become the next source to
 *               be subscribed to
 * @return a new Flowable instance
 * @since 2.1.11 - experimental
 * @see #concatMapMaybeDelayError(Function)
 * @see #concatMapMaybe(Function, int)
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Flowable<R> concatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
  return concatMapMaybe(mapper, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void empty() {
  Flowable.range(1, 10)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.empty();
    }
  })
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mixed() {
  Flowable.range(1, 10)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      if (v % 2 == 0) {
        return Maybe.just(v);
      }
      return Maybe.empty();
    }
  })
  .test()
  .assertResult(2, 4, 6, 8, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limit() {
  Flowable.range(1, 5)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  })
  .limit(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void simpleLong() {
  Flowable.range(1, 1024)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  }, 32)
  .test()
  .assertValueCount(1024)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainError() {
  Flowable.error(new TestException())
  .concatMapMaybe(Functions.justFunction(Maybe.just(1)))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerError() {
  Flowable.just(1)
  .concatMapMaybe(Functions.justFunction(Maybe.error(new TestException())))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void cancel() {
  Flowable.range(1, 5)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  })
  .test(3)
  .assertValues(1, 2, 3)
  .assertNoErrors()
  .assertNotComplete()
  .cancel();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mixedLong() {
  Flowable.range(1, 1024)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      if (v % 2 == 0) {
        return Maybe.just(v).subscribeOn(Schedulers.computation());
      }
      return Maybe.<Integer>empty().subscribeOn(Schedulers.computation());
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertValueCount(512)
  .assertNoErrors()
  .assertComplete()
  .assertOf(new Consumer<TestSubscriber<Integer>>() {
    @Override
    public void accept(TestSubscriber<Integer> ts) throws Exception {
      for (int i = 0; i < 512; i ++) {
        ts.assertValueAt(i, (i + 1) * 2);
      }
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerSuccessDisposeRace() {
  for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
    final MaybeSubject<Integer> ms = MaybeSubject.create();
    final TestSubscriber<Integer> ts = Flowable.just(1)
        .hide()
        .concatMapMaybe(Functions.justFunction(ms))
        .test();
    Runnable r1 = new Runnable() {
      @Override
      public void run() {
        ms.onSuccess(1);
      }
    };
    Runnable r2 = new Runnable() {
      @Override
      public void run() {
        ts.dispose();
      }
    };
    TestHelper.race(r1, r2);
    ts.assertNoErrors();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backpressure() {
  TestSubscriber<Integer> ts = Flowable.range(1, 1024)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  }, 32)
  .test(0);
  for (int i = 1; i <= 1024; i++) {
    ts.assertValueCount(i - 1)
    .assertNoErrors()
    .assertNotComplete()
    .requestMore(1)
    .assertValueCount(i)
    .assertNoErrors();
  }
  ts.assertComplete();
}

相关文章

Flowable类方法