io.reactivex.Flowable.mergeArrayDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(240)

本文整理了Java中io.reactivex.Flowable.mergeArrayDelayError()方法的一些代码示例,展示了Flowable.mergeArrayDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.mergeArrayDelayError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:mergeArrayDelayError

Flowable.mergeArrayDelayError介绍

[英]Flattens an array of Publishers into one Publisher, in a way that allows a Subscriber to receive all successfully emitted items from each of the source Publishers without being interrupted by an error notification from one of them, while limiting the number of concurrent subscriptions to these Publishers.

This behaves like #merge(Publisher) except that if any of the merged Publishers notify of an error via Subscriber#onError, mergeDelayError will refrain from propagating that error notification until all of the merged Publishers have finished emitting items.

Even if multiple merged Publishers send onError notifications, mergeDelayError will only invoke the onError method of its Subscribers once. Backpressure: The operator honors backpressure from downstream. All source Publishers are expected to honor backpressure; if violated, the operator may signal MissingBackpressureException. Scheduler: mergeArrayDelayError does not operate by default on a particular Scheduler.
[中]将发布服务器阵列扁平化为一个发布服务器,这样一来,订阅者就可以从每个源发布服务器接收所有成功发出的项目,而不会被其中一个发布服务器的错误通知中断,同时限制对这些发布服务器的并发订阅数量。
其行为类似于#merge(Publisher),不同的是,如果任何合并的发布服务器通过订阅者#onError通知错误,则mergeDelayError将在所有合并的发布服务器完成发送项目之前避免传播该错误通知。
即使多个合并发布服务器发送OneError通知,mergeDelayError也只会调用其订阅服务器的OneError方法一次。背压:操作员接受来自下游的背压。所有出版者都应该尊重背压;如果违反,操作员可能发出信号缺失背压异常。调度程序:默认情况下,mergeArrayDelayError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void mergeDelayErrorArrayNull() {
  Flowable.mergeArrayDelayError(128, 128, (Publisher<Object>[])null);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void mergeDelayErrorArrayOneIsNull() {
  Flowable.mergeArrayDelayError(128, 128, just1, null).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void array() {
  for (int i = 1; i < 100; i++) {
    @SuppressWarnings("unchecked")
    Flowable<Integer>[] sources = new Flowable[i];
    Arrays.fill(sources, Flowable.just(1));
    Integer[] expected = new Integer[i];
    for (int j = 0; j < i; j++) {
      expected[j] = 1;
    }
    Flowable.mergeArrayDelayError(sources)
    .test()
    .assertResult(expected);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void mergeArrayDelayError() {
  Flowable.mergeArrayDelayError(Flowable.just(1), Flowable.just(2))
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Merge the values in arbitrary order from a sequence of Solo sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Solo<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArrayDelayError(Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(sources);
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Merge the values in arbitrary order from a sequence of Solo sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Solo<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArrayDelayError(Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(sources);
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Merge the values in arbitrary order from a sequence of Perhaps sources,
 * delaying errors till all sources terminate.
 * @param <T> the common base value type
 * @param sources the sequence of sources
 * @param maxConcurrency the maximum number of active subscriptions
 * @return the new Flowable instance
 */
public static <T> Flowable<T> mergeArrayDelayError(int maxConcurrency, Perhaps<? extends T>... sources) {
  return Flowable.mergeArrayDelayError(maxConcurrency, 1, sources);
}

相关文章

Flowable类方法