io.reactivex.Observable.concatEager()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(127)

本文整理了Java中io.reactivex.Observable.concatEager()方法的一些代码示例,展示了Observable.concatEager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatEager()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatEager

Observable.concatEager介绍

[英]Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the emitted source ObservableSources as they are observed. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes.

Scheduler: This method does not operate by default on a particular Scheduler.
[中]将可观测资源序列急切地连接到单个值流中。
急切串联意味着一旦订户订阅,这个操作符订阅所有被观察到的发射源可观测源。运算符缓冲这些可观察资源发出的值,然后依次将其耗尽,每一个都在前一个完成之后。
调度器:默认情况下,该方法不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void badCapacityHint() throws Exception {
  4. Observable<Integer> source = Observable.just(1);
  5. try {
  6. Observable.concatEager(Arrays.asList(source, source, source), 1, -99);
  7. } catch (IllegalArgumentException ex) {
  8. assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
  9. }
  10. }

代码示例来源:origin: kaushikgopal/RxJava-Android-Samples

  1. @OnClick(R.id.btn_pseudoCache_concatEager)
  2. public void onConcatEagerBtnClicked() {
  3. infoText.setText(R.string.msg_pseudoCache_demoInfo_concatEager);
  4. wireupDemo();
  5. List<Observable<Contributor>> observables = new ArrayList<>(2);
  6. observables.add(getSlowCachedDiskData());
  7. observables.add(getFreshNetworkData());
  8. Observable.concatEager(observables)
  9. .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
  10. .observeOn(AndroidSchedulers.mainThread())
  11. .subscribe(
  12. new DisposableObserver<Contributor>() {
  13. @Override
  14. public void onComplete() {
  15. Timber.d("done loading all data");
  16. }
  17. @Override
  18. public void onError(Throwable e) {
  19. Timber.e(e, "arr something went wrong");
  20. }
  21. @Override
  22. public void onNext(Contributor contributor) {
  23. contributionMap.put(contributor.login, contributor.contributions);
  24. adapterDetail.clear();
  25. adapterDetail.addAll(mapAsList(contributionMap));
  26. }
  27. });
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatEagerIterable() {
  4. Observable.concatEager(Arrays.asList(Observable.just(1), Observable.just(2)))
  5. .test()
  6. .assertResult(1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
  6. * ObservableSources and then drains them in order, each one after the previous one completes.
  7. * <p>
  8. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources a sequence of ObservableSources that need to be eagerly concatenated
  15. * @return the new ObservableSource instance with the specified concatenation behavior
  16. * @since 2.0
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  21. return concatEager(sources, bufferSize(), bufferSize());
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates a sequence of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
  6. * in order, each one after the previous one completes.
  7. * <p>
  8. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources a sequence of ObservableSources that need to be eagerly concatenated
  15. * @return the new ObservableSource instance with the specified concatenation behavior
  16. * @since 2.0
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources) {
  21. return concatEager(sources, bufferSize(), bufferSize());
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void Observable() {
  3. Observable<Integer> source = Observable.just(1);
  4. TestObserver<Integer> to = TestObserver.create();
  5. Observable.concatEager(Observable.just(source, source, source)).subscribe(to);
  6. to.assertValues(1, 1, 1);
  7. to.assertNoErrors();
  8. to.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void capacityHint() {
  4. Observable<Integer> source = Observable.just(1);
  5. TestObserver<Integer> to = TestObserver.create();
  6. Observable.concatEager(Arrays.asList(source, source, source), 1, 1).subscribe(to);
  7. to.assertValues(1, 1, 1);
  8. to.assertNoErrors();
  9. to.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void ObservableCapacityHint() {
  3. Observable<Integer> source = Observable.just(1);
  4. TestObserver<Integer> to = TestObserver.create();
  5. Observable.concatEager(Observable.just(source, source, source), 1, 1).subscribe(to);
  6. to.assertValues(1, 1, 1);
  7. to.assertNoErrors();
  8. to.assertComplete();
  9. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates a sequence of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
  6. * in order, each one after the previous one completes.
  7. * <p>
  8. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources a sequence of ObservableSources that need to be eagerly concatenated
  15. * @return the new ObservableSource instance with the specified concatenation behavior
  16. * @since 2.0
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources) {
  21. return concatEager(sources, bufferSize(), bufferSize());
  22. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates an ObservableSource sequence of ObservableSources eagerly into a single stream of values.
  3. * <p>
  4. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  5. * emitted source ObservableSources as they are observed. The operator buffers the values emitted by these
  6. * ObservableSources and then drains them in order, each one after the previous one completes.
  7. * <p>
  8. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatEager.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. * @param <T> the value type
  14. * @param sources a sequence of ObservableSources that need to be eagerly concatenated
  15. * @return the new ObservableSource instance with the specified concatenation behavior
  16. * @since 2.0
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  21. return concatEager(sources, bufferSize(), bufferSize());
  22. }

代码示例来源:origin: imZeJun/RxSample

  1. private void refreshArticleUseContactEager() {
  2. List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
  3. observables.add(getCacheArticle(2500).subscribeOn(Schedulers.io()));
  4. observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
  5. Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
  6. DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
  7. contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
  8. }

相关文章

Observable类方法