io.reactivex.Flowable.mergeArray()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(150)

本文整理了Java中io.reactivex.Flowable.mergeArray()方法的一些代码示例,展示了Flowable.mergeArray()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.mergeArray()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:mergeArray

Flowable.mergeArray介绍

[英]Flattens an Iterable of Publishers into one Publisher, without any transformation, while limiting the number of concurrent subscriptions to these Publishers.

You can combine the items emitted by multiple Publishers so that they appear as a single Publisher, by using the merge method. Backpressure: The operator honors backpressure from downstream. The source Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: mergeArray does not operate by default on a particular Scheduler. Error handling: If any of the source Publishers signal a Throwable via onError, the resulting Flowable terminates with that Throwable and all other source Publishers are canceled. If more than one Publisher signals an error, the resulting Flowable may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException containing two or more of the various error signals. Throwables that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins#onError(Throwable) method as UndeliverableException errors. Similarly, Throwables signaled by source(s) after the returned Flowable has been canceled or terminated with a (composite) error will be sent to the same global error handler. Use #mergeArrayDelayError(int,int,Publisher[]) to merge sources and terminate only when all source Publishers have completed or failed with an error.
[中]将多个发布服务器扁平化为一个发布服务器,无需任何转换,同时限制对这些发布服务器的并发订阅数量。
通过使用合并方法,可以合并多个发布服务器发出的项目,使它们显示为单个发布服务器。背压:操作员接受来自下游的背压。希望源出版商尊重背压;如果违反,操作员可能发出信号缺失背压异常。调度程序:默认情况下,mergeArray不会在特定调度程序上运行。错误处理:如果任何源发布服务器通过OneError发出可丢弃的信号,则生成的Flowable将终止该可丢弃,并且所有其他源发布服务器将被取消。如果一个以上的发布者发出错误信号,则生成的Flowable可能会以第一个发布者的错误终止,或者根据源的并发性,可能会以包含两个或多个不同错误信号的CompositeException终止。未进入组合的一次性文件将作为无法交付的异常错误通过RxJavaPlugins#onError(一次性)方法(单独)发送到全局错误处理程序。类似地,在返回的Flowable被取消或终止(复合)错误后,源发出的丢弃信号将被发送到同一全局错误处理程序。使用#mergeArrayDelayError(int,int,Publisher[])合并源,并仅在所有源发布服务器都已完成或失败且出现错误时终止。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void mergeArrayNull() {
  Flowable.mergeArray(128, 128, (Publisher<Object>[])null);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void mergeArrayOneIsNull() {
  Flowable.mergeArray(128, 128, just1, null).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Override
  public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
    return Flowable.mergeArray(
        v.filter(new Predicate<Integer>() {
          @Override
          public boolean test(Integer w) throws Exception {
            return w % 2 == 0;
          }
        }),
        v.filter(new Predicate<Integer>() {
          @Override
          public boolean test(Integer w) throws Exception {
            return w % 2 != 0;
          }
        }));
  }
})

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Override
  public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
    return Flowable.mergeArray(
        v.filter(new Predicate<Integer>() {
          @Override
          public boolean test(Integer w) throws Exception {
            return w % 2 == 0;
          }
        }),
        v.filter(new Predicate<Integer>() {
          @Override
          public boolean test(Integer w) throws Exception {
            return w % 2 != 0;
          }
        }));
  }
})

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Override
  public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
    return Flowable.mergeArray(
        v.filter(new Predicate<Integer>() {
          @Override
          public boolean test(Integer w) throws Exception {
            return w % 2 == 0;
          }
        }),
        v.filter(new Predicate<Integer>() {
          @Override
          public boolean test(Integer w) throws Exception {
            return w % 2 != 0;
          }
        }));
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test
public void array() {
  for (int i = 1; i < 100; i++) {
    @SuppressWarnings("unchecked")
    Flowable<Integer>[] sources = new Flowable[i];
    Arrays.fill(sources, Flowable.just(1));
    Integer[] expected = new Integer[i];
    for (int j = 0; j < i; j++) {
      expected[j] = 1;
    }
    Flowable.mergeArray(sources)
    .test()
    .assertResult(expected);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void mergeArray() {
  Flowable.mergeArray(Flowable.just(1), Flowable.just(2))
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void mergeArrayMaxConcurrent() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  PublishProcessor<Integer> pp1 = PublishProcessor.create();
  PublishProcessor<Integer> pp2 = PublishProcessor.create();
  Flowable.mergeArray(1, 128, new Flowable[] { pp1, pp2 }).subscribe(ts);
  assertTrue("ps1 has no subscribers?!", pp1.hasSubscribers());
  assertFalse("ps2 has subscribers?!", pp2.hasSubscribers());
  pp1.onNext(1);
  pp1.onComplete();
  assertFalse("ps1 has subscribers?!", pp1.hasSubscribers());
  assertTrue("ps2 has no subscribers?!", pp2.hasSubscribers());
  pp2.onNext(2);
  pp2.onComplete();
  ts.assertValues(1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArray(int maxConcurrency, Perhaps<? extends T>... sources) {
  return Flowable.mergeArray(maxConcurrency, 1, sources);
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Merge the values in arbitrary order from a sequence of Solo sources.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArray(int maxConcurrency, Solo<? extends T>... sources) {
  return Flowable.mergeArray(maxConcurrency, 1, sources);
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Merge the values in arbitrary order from a sequence of Solo sources.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArray(int maxConcurrency, Solo<? extends T>... sources) {
  return Flowable.mergeArray(maxConcurrency, 1, sources);
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArray(Perhaps<? extends T>... sources) {
  return Flowable.mergeArray(sources);
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArray(Perhaps<? extends T>... sources) {
  return Flowable.mergeArray(sources);
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArray(int maxConcurrency, Perhaps<? extends T>... sources) {
  return Flowable.mergeArray(maxConcurrency, 1, sources);
}

相关文章

Flowable类方法