io.reactivex.Observable.concatEager()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(106)

本文整理了Java中io.reactivex.Observable.concatEager()方法的一些代码示例,展示了Observable.concatEager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatEager()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatEager

Observable.concatEager介绍

[英]Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the emitted source ObservableSources as they are observed. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes.

Scheduler: This method does not operate by default on a particular Scheduler.
[中]将可观测资源序列急切地连接到单个值流中。
急切串联意味着一旦订户订阅,这个操作符订阅所有被观察到的发射源可观测源。运算符缓冲这些可观察资源发出的值,然后依次将其耗尽,每一个都在前一个完成之后。
调度器:默认情况下,该方法不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void badCapacityHint() throws Exception {
  Observable<Integer> source = Observable.just(1);
  try {
    Observable.concatEager(Arrays.asList(source, source, source), 1, -99);
  } catch (IllegalArgumentException ex) {
    assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: kaushikgopal/RxJava-Android-Samples

@OnClick(R.id.btn_pseudoCache_concatEager)
public void onConcatEagerBtnClicked() {
 infoText.setText(R.string.msg_pseudoCache_demoInfo_concatEager);
 wireupDemo();
 List<Observable<Contributor>> observables = new ArrayList<>(2);
 observables.add(getSlowCachedDiskData());
 observables.add(getFreshNetworkData());
 Observable.concatEager(observables)
   .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(
     new DisposableObserver<Contributor>() {
      @Override
      public void onComplete() {
       Timber.d("done loading all data");
      }
      @Override
      public void onError(Throwable e) {
       Timber.e(e, "arr something went wrong");
      }
      @Override
      public void onNext(Contributor contributor) {
       contributionMap.put(contributor.login, contributor.contributions);
       adapterDetail.clear();
       adapterDetail.addAll(mapAsList(contributionMap));
      }
     });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void concatEagerIterable() {
  Observable.concatEager(Arrays.asList(Observable.just(1), Observable.just(2)))
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
 * <p>
 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
 * emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
 * ObservableSources and then drains them in order, each one after the previous one completes.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param sources a sequence of ObservableSources that need to be eagerly concatenated
 * @return the new ObservableSource instance with the specified concatenation behavior
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return concatEager(sources, bufferSize(), bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates a sequence of ObservableSources eagerly into a single stream of values.
 * <p>
 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
 * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
 * in order, each one after the previous one completes.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param sources a sequence of ObservableSources that need to be eagerly concatenated
 * @return the new ObservableSource instance with the specified concatenation behavior
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources) {
  return concatEager(sources, bufferSize(), bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void Observable() {
  Observable<Integer> source = Observable.just(1);
  TestObserver<Integer> to = TestObserver.create();
  Observable.concatEager(Observable.just(source, source, source)).subscribe(to);
  to.assertValues(1, 1, 1);
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void capacityHint() {
  Observable<Integer> source = Observable.just(1);
  TestObserver<Integer> to = TestObserver.create();
  Observable.concatEager(Arrays.asList(source, source, source), 1, 1).subscribe(to);
  to.assertValues(1, 1, 1);
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void ObservableCapacityHint() {
  Observable<Integer> source = Observable.just(1);
  TestObserver<Integer> to = TestObserver.create();
  Observable.concatEager(Observable.just(source, source, source), 1, 1).subscribe(to);
  to.assertValues(1, 1, 1);
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: redisson/redisson

/**
 * Concatenates a sequence of ObservableSources eagerly into a single stream of values.
 * <p>
 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
 * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
 * in order, each one after the previous one completes.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param sources a sequence of ObservableSources that need to be eagerly concatenated
 * @return the new ObservableSource instance with the specified concatenation behavior
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources) {
  return concatEager(sources, bufferSize(), bufferSize());
}

代码示例来源:origin: redisson/redisson

/**
 * Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
 * <p>
 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
 * emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
 * ObservableSources and then drains them in order, each one after the previous one completes.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param sources a sequence of ObservableSources that need to be eagerly concatenated
 * @return the new ObservableSource instance with the specified concatenation behavior
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return concatEager(sources, bufferSize(), bufferSize());
}

代码示例来源:origin: imZeJun/RxSample

private void refreshArticleUseContactEager() {
  List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
  observables.add(getCacheArticle(2500).subscribeOn(Schedulers.io()));
  observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
  Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
  DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
  contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}

相关文章

Observable类方法