io.reactivex.Flowable.concatMapSingle()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(212)

本文整理了Java中io.reactivex.Flowable.concatMapSingle()方法的一些代码示例,展示了Flowable.concatMapSingle()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatMapSingle()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatMapSingle

Flowable.concatMapSingle介绍

[英]Maps the upstream items into SingleSources and subscribes to them one after the other succeeds, emits their success values or terminates immediately if either this Flowable or the current inner SingleSource fail.

Backpressure: The operator expects the upstream to support backpressure and honors the backpressure from downstream. If this Flowable violates the rule, the operator will signal a MissingBackpressureException. Scheduler: concatMapSingle does not operate by default on a particular Scheduler.
[中]将上游项映射到单一源中,并在另一个成功后逐个订阅它们,发出它们的成功值,或者如果此可流动项或当前内部单一源失败,则立即终止。
背压:操作员希望上游支持背压,并尊重下游的背压。如果该流体违反规则,操作员将发出缺少背压异常的信号。调度程序:默认情况下,concatMapSingle不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements)
  5. .concatMapSingle(new Function<Integer, Single<Integer>>() {
  6. @Override
  7. public Single<Integer> apply(Integer v) throws Exception {
  8. return Single.just(v);
  9. }
  10. })
  11. ;
  12. }
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the
  3. * other succeeds, emits their success values or terminates immediately if
  4. * either this {@code Flowable} or the current inner {@code SingleSource} fail.
  5. * <p>
  6. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
  7. * <dl>
  8. * <dt><b>Backpressure:</b></dt>
  9. * <dd>The operator expects the upstream to support backpressure and honors
  10. * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will
  11. * signal a {@code MissingBackpressureException}.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. * <p>History: 2.1.11 - experimental
  16. * @param <R> the result type of the inner {@code SingleSource}s
  17. * @param mapper the function called with the upstream item and should return
  18. * a {@code SingleSource} to become the next source to
  19. * be subscribed to
  20. * @return a new Flowable instance
  21. * @see #concatMapSingleDelayError(Function)
  22. * @see #concatMapSingle(Function, int)
  23. * @since 2.2
  24. */
  25. @CheckReturnValue
  26. @BackpressureSupport(BackpressureKind.FULL)
  27. @SchedulerSupport(SchedulerSupport.NONE)
  28. public final <R> Flowable<R> concatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
  29. return concatMapSingle(mapper, 2);
  30. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the
  3. * other succeeds, emits their success values or terminates immediately if
  4. * either this {@code Flowable} or the current inner {@code SingleSource} fail.
  5. * <p>
  6. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
  7. * <dl>
  8. * <dt><b>Backpressure:</b></dt>
  9. * <dd>The operator expects the upstream to support backpressure and honors
  10. * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will
  11. * signal a {@code MissingBackpressureException}.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. * @param <R> the result type of the inner {@code SingleSource}s
  16. * @param mapper the function called with the upstream item and should return
  17. * a {@code SingleSource} to become the next source to
  18. * be subscribed to
  19. * @return a new Flowable instance
  20. * @since 2.1.11 - experimental
  21. * @see #concatMapSingleDelayError(Function)
  22. * @see #concatMapSingle(Function, int)
  23. */
  24. @CheckReturnValue
  25. @BackpressureSupport(BackpressureKind.FULL)
  26. @SchedulerSupport(SchedulerSupport.NONE)
  27. @Experimental
  28. public final <R> Flowable<R> concatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
  29. return concatMapSingle(mapper, 2);
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simple() {
  3. Flowable.range(1, 5)
  4. .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
  5. @Override
  6. public SingleSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Single.just(v);
  9. }
  10. })
  11. .test()
  12. .assertResult(1, 2, 3, 4, 5);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void limit() {
  3. Flowable.range(1, 5)
  4. .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
  5. @Override
  6. public SingleSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Single.just(v);
  9. }
  10. })
  11. .limit(3)
  12. .test()
  13. .assertResult(1, 2, 3);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simpleLong() {
  3. Flowable.range(1, 1024)
  4. .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
  5. @Override
  6. public SingleSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Single.just(v);
  9. }
  10. }, 32)
  11. .test()
  12. .assertValueCount(1024)
  13. .assertNoErrors()
  14. .assertComplete();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mainError() {
  3. Flowable.error(new TestException())
  4. .concatMapSingle(Functions.justFunction(Single.just(1)))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerError() {
  3. Flowable.just(1)
  4. .concatMapSingle(Functions.justFunction(Single.error(new TestException())))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancel() {
  3. Flowable.range(1, 5)
  4. .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
  5. @Override
  6. public SingleSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Single.just(v);
  9. }
  10. })
  11. .test(3)
  12. .assertValues(1, 2, 3)
  13. .assertNoErrors()
  14. .assertNotComplete()
  15. .cancel();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerSuccessDisposeRace() {
  3. for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
  4. final SingleSubject<Integer> ss = SingleSubject.create();
  5. final TestSubscriber<Integer> ts = Flowable.just(1)
  6. .hide()
  7. .concatMapSingle(Functions.justFunction(ss))
  8. .test();
  9. Runnable r1 = new Runnable() {
  10. @Override
  11. public void run() {
  12. ss.onSuccess(1);
  13. }
  14. };
  15. Runnable r2 = new Runnable() {
  16. @Override
  17. public void run() {
  18. ts.dispose();
  19. }
  20. };
  21. TestHelper.race(r1, r2);
  22. ts.assertNoErrors();
  23. }
  24. }
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void backpressure() {
  3. TestSubscriber<Integer> ts = Flowable.range(1, 1024)
  4. .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
  5. @Override
  6. public SingleSource<Integer> apply(Integer v)
  7. throws Exception {
  8. return Single.just(v);
  9. }
  10. }, 32)
  11. .test(0);
  12. for (int i = 1; i <= 1024; i++) {
  13. ts.assertValueCount(i - 1)
  14. .assertNoErrors()
  15. .assertNotComplete()
  16. .requestMore(1)
  17. .assertValueCount(i)
  18. .assertNoErrors();
  19. }
  20. ts.assertComplete();
  21. }

代码示例来源:origin: xiancloud/xian

  1. .concatMapSingle(action -> action.execute(this, request.getArgMap(), transaction.getConnection(), request.getContext().getMsgId()))

代码示例来源:origin: info.xiancloud/xian-daocore

  1. .concatMapSingle(action -> action.execute(this, request.getArgMap(), transaction.getConnection(), request.getContext().getMsgId()))

相关文章

Flowable类方法