
x33g5p2x  于2022-01-19 转载在 其他  



[英]Flattens an array of Publishers into one Publisher, in a way that allows a Subscriber to receive all successfully emitted items from each of the source Publishers without being interrupted by an error notification from one of them, while limiting the number of concurrent subscriptions to these Publishers.

This behaves like #merge(Publisher) except that if any of the merged Publishers notify of an error via Subscriber#onError, mergeDelayError will refrain from propagating that error notification until all of the merged Publishers have finished emitting items.

Even if multiple merged Publishers send onError notifications, mergeDelayError will only invoke the onError method of its Subscribers once. Backpressure: The operator honors backpressure from downstream. All source Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: mergeArrayDelayError does not operate by default on a particular Scheduler.


代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void mergeDelayErrorArrayNull() {
  Flowable.mergeArrayDelayError(128, 128, (Publisher<Object>[])null);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void mergeDelayErrorArrayOneIsNull() {
  Flowable.mergeArrayDelayError(128, 128, just1, null).blockingLast();

代码示例来源:origin: ReactiveX/RxJava

public void array() {
  for (int i = 1; i < 100; i++) {
    Flowable<Integer>[] sources = new Flowable[i];
    Arrays.fill(sources, Flowable.just(1));
    Integer[] expected = new Integer[i];
    for (int j = 0; j < i; j++) {
      expected[j] = 1;

代码示例来源:origin: ReactiveX/RxJava

public void mergeArrayDelayError() {
  Flowable.mergeArrayDelayError(Flowable.just(1), Flowable.just(2))
  .assertResult(1, 2);

代码示例来源:origin: akarnokd/RxJava2Extensions

 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);

代码示例来源:origin: akarnokd/RxJava2Extensions

 * Merge the values in arbitrary order from a sequence of Solo sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Solo<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);

代码示例来源:origin: akarnokd/RxJava2Extensions

 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @return the new Flowable instance
public static <T> Flowable<T> mergeArrayDelayError(Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(sources);

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

 * Merge the values in arbitrary order from a sequence of Solo sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Solo<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @return the new Flowable instance
public static <T> Flowable<T> mergeArrayDelayError(Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(sources);

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);

