io.reactivex.Flowable.concatEager()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(145)

本文整理了Java中io.reactivex.Flowable.concatEager()方法的一些代码示例,展示了Flowable.concatEager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatEager()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatEager

Flowable.concatEager介绍

[英]Concatenates a sequence of Publishers eagerly into a single stream of values.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source Publishers. The operator buffers the values emitted by these Publishers and then drains them in order, each one after the previous one completes. Backpressure: Backpressure is honored towards the downstream and the inner Publishers are expected to support backpressure. Violating this assumption, the operator will signal MissingBackpressureException. Scheduler: This method does not operate by default on a particular Scheduler.
[中]将一系列发布者急切地连接成一个单一的值流。
即时连接意味着一旦订阅者订阅,该操作员就订阅所有源发布者。运算符缓冲这些发布服务器发出的值,然后依次将其排出,在前一个发布服务器完成后再排出。背压:背压朝向下游,预计内部出版商将支持背压。违反此假设,操作员将发出MissingBackpressureException信号。调度程序:默认情况下,此方法不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void badCapacityHint() throws Exception {
  4. Flowable<Integer> source = Flowable.just(1);
  5. try {
  6. Flowable.concatEager(Arrays.asList(source, source, source), 1, -99);
  7. } catch (IllegalArgumentException ex) {
  8. assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
  9. }
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatEagerZero() {
  3. Flowable.concatEager(Collections.<Flowable<Integer>>emptyList())
  4. .test()
  5. .assertResult();
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates a sequence of Publishers eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * source Publishers. The operator buffers the values emitted by these Publishers and then drains them
  6. * in order, each one after the previous one completes.
  7. * <dl>
  8. * <dt><b>Backpressure:</b></dt>
  9. * <dd>Backpressure is honored towards the downstream and the inner Publishers are
  10. * expected to support backpressure. Violating this assumption, the operator will
  11. * signal {@code MissingBackpressureException}.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. * @param <T> the value type
  16. * @param sources a sequence of Publishers that need to be eagerly concatenated
  17. * @return the new Publisher instance with the specified concatenation behavior
  18. * @since 2.0
  19. */
  20. @CheckReturnValue
  21. @BackpressureSupport(BackpressureKind.FULL)
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends T>> sources) {
  24. return concatEager(sources, bufferSize(), bufferSize());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates a Publisher sequence of Publishers eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * emitted source Publishers as they are observed. The operator buffers the values emitted by these
  6. * Publishers and then drains them in order, each one after the previous one completes.
  7. * <dl>
  8. * <dt><b>Backpressure:</b></dt>
  9. * <dd>Backpressure is honored towards the downstream and both the outer and inner Publishers are
  10. * expected to support backpressure. Violating this assumption, the operator will
  11. * signal {@code MissingBackpressureException}.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. * @param <T> the value type
  16. * @param sources a sequence of Publishers that need to be eagerly concatenated
  17. * @return the new Publisher instance with the specified concatenation behavior
  18. * @since 2.0
  19. */
  20. @CheckReturnValue
  21. @BackpressureSupport(BackpressureKind.FULL)
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public static <T> Flowable<T> concatEager(Publisher<? extends Publisher<? extends T>> sources) {
  24. return concatEager(sources, bufferSize(), bufferSize());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatEagerOne() {
  4. Flowable.concatEager(Arrays.asList(Flowable.just(1)))
  5. .test()
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Override
  3. public Publisher<Long> createPublisher(long elements) {
  4. return
  5. Flowable.concatEager(Arrays.asList(
  6. Flowable.fromIterable(iterate(elements / 2)),
  7. Flowable.fromIterable(iterate(elements - elements / 2))
  8. )
  9. )
  10. ;
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatEagerTwo() {
  4. Flowable.concatEager(Arrays.asList(Flowable.just(1), Flowable.just(2)))
  5. .test()
  6. .assertResult(1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatEagerIterable() {
  4. Flowable.concatEager(Arrays.asList(Flowable.just(1), Flowable.just(2)))
  5. .test()
  6. .assertResult(1, 2);
  7. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates a Publisher sequence of Publishers eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * emitted source Publishers as they are observed. The operator buffers the values emitted by these
  6. * Publishers and then drains them in order, each one after the previous one completes.
  7. * <dl>
  8. * <dt><b>Backpressure:</b></dt>
  9. * <dd>Backpressure is honored towards the downstream and both the outer and inner Publishers are
  10. * expected to support backpressure. Violating this assumption, the operator will
  11. * signal {@code MissingBackpressureException}.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. * @param <T> the value type
  16. * @param sources a sequence of Publishers that need to be eagerly concatenated
  17. * @return the new Publisher instance with the specified concatenation behavior
  18. * @since 2.0
  19. */
  20. @CheckReturnValue
  21. @BackpressureSupport(BackpressureKind.FULL)
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public static <T> Flowable<T> concatEager(Publisher<? extends Publisher<? extends T>> sources) {
  24. return concatEager(sources, bufferSize(), bufferSize());
  25. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates a sequence of Publishers eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * source Publishers. The operator buffers the values emitted by these Publishers and then drains them
  6. * in order, each one after the previous one completes.
  7. * <dl>
  8. * <dt><b>Backpressure:</b></dt>
  9. * <dd>Backpressure is honored towards the downstream and the inner Publishers are
  10. * expected to support backpressure. Violating this assumption, the operator will
  11. * signal {@code MissingBackpressureException}.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. * @param <T> the value type
  16. * @param sources a sequence of Publishers that need to be eagerly concatenated
  17. * @return the new Publisher instance with the specified concatenation behavior
  18. * @since 2.0
  19. */
  20. @CheckReturnValue
  21. @BackpressureSupport(BackpressureKind.FULL)
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends T>> sources) {
  24. return concatEager(sources, bufferSize(), bufferSize());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Long> createPublisher(long elements) {
  3. return
  4. Flowable.concatEager(Flowable.just(
  5. Flowable.fromIterable(iterate(elements / 2)),
  6. Flowable.fromIterable(iterate(elements - elements / 2))
  7. )
  8. )
  9. ;
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void capacityHint() {
  4. Flowable<Integer> source = Flowable.just(1);
  5. TestSubscriber<Integer> ts = TestSubscriber.create();
  6. Flowable.concatEager(Arrays.asList(source, source, source), 1, 1).subscribe(ts);
  7. ts.assertValues(1, 1, 1);
  8. ts.assertNoErrors();
  9. ts.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void ObservableCapacityHint() {
  3. Flowable<Integer> source = Flowable.just(1);
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.concatEager(Flowable.just(source, source, source), 1, 1).subscribe(ts);
  6. ts.assertValues(1, 1, 1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void Flowable() {
  3. Flowable<Integer> source = Flowable.just(1);
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.concatEager(Flowable.just(source, source, source)).subscribe(ts);
  6. ts.assertValues(1, 1, 1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void flowable() {
  3. Flowable<Integer> source = Flowable.just(1);
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.concatEager(Flowable.just(source, source, source)).subscribe(ts);
  6. ts.assertValues(1, 1, 1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void flowableCapacityHint() {
  3. Flowable<Integer> source = Flowable.just(1);
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.concatEager(Flowable.just(source, source, source), 1, 1).subscribe(ts);
  6. ts.assertValues(1, 1, 1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: com.github.davidmoten/rxjava2-extras

  1. .concatEager(subject.serialize() //
  2. .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
  3. .doOnRequest(request);

代码示例来源:origin: davidmoten/rxjava2-extras

  1. .concatEager(subject.serialize() //
  2. .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
  3. .doOnRequest(request);

相关文章

Flowable类方法