rx.Observable.compose()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(261)

本文整理了Java中rx.Observable.compose()方法的一些代码示例,展示了Observable.compose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.compose()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:compose

Observable.compose介绍

[英]Transform an Observable by applying a particular Transformer function to it.

This method operates on the Observable itself whereas #lift operates on the Observable's Subscribers or Observers.

If the operator you are creating is designed to act on the individual items emitted by a source Observable, use #lift. If your operator is designed to transform the source Observable as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose. Scheduler: compose does not operate by default on a particular Scheduler.
[中]通过对一个可观察对象应用特定的变换函数来对其进行变换。
这种方法对可观察对象本身起作用,而#lift对可观察对象的订户或观察者起作用。
如果你正在创建的操作符被设计为作用于可观察到的源发出的单个项目,请使用#lift。如果您的操作符被设计为将源可观测作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用compose。调度器:默认情况下,compose不会在特定的调度器上运行。

代码示例

代码示例来源:origin: jaydenxiao2016/AndroidFire

@Override
public Observable<String> swapDb(final ArrayList<NewsChannelTable> newsChannelTableList, int fromPosition, int toPosition) {
  return Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
      ACache.get(AppApplication.getAppContext()).put(AppConstant.CHANNEL_MINE,newsChannelTableList);
      subscriber.onNext("");
      subscriber.onCompleted();
    }
  }).compose(RxSchedulers.<String>io_main());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void getTags() {
  RetrofitHelper.getSearchAPI()
      .getHotSearchTags()
      .compose(bindToLifecycle())
      .map(HotSearchTag::getList)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(listBeans -> {
        hotSearchTags.addAll(listBeans);
        initTagLayout();
      }, throwable -> {
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void setUpSplash() {
  Observable.timer(2000, TimeUnit.MILLISECONDS)
      .compose(bindToLifecycle())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(aLong -> finishTask());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void initRxBus() {
  RxBus.getInstance().toObserverable(Integer.class)
      .compose(bindToLifecycle())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(this::switchPager);
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getLiveAPI()
      .getLiveAppIndex()
      .compose(bindToLifecycle())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(liveAppIndexInfo -> {
        mLiveAppIndexAdapter.setLiveInfo(liveAppIndexInfo);
        finishTask();
      }, throwable -> {
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getBiliAppAPI()
      .getVideoDetails(av)
      .compose(this.bindToLifecycle())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(videoDetails -> {
        mVideoDetailsInfo = videoDetails.getData();
        finishTask();
      }, throwable -> {
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getBiliGoAPI()
      .getNewBangumiSerialList()
      .compose(this.bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .map(NewBangumiSerialInfo::getList)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(listBeans -> {
        newBangumiSerials.addAll(listBeans);
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getLiveAPI()
      .getLiveAppIndex()
      .compose(bindToLifecycle())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(liveAppIndexInfo -> {
        mLiveAppIndexAdapter.setLiveInfo(liveAppIndexInfo);
        finishTask();
      }, throwable -> initEmptyView());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

public void getUserInfo() {
  RetrofitHelper.getAccountAPI()
      .getUserInfoById(mid)
      .compose(this.bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(userInfo -> {
        mUserDetailsInfo = userInfo;
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getBangumiAPI()
      .getSeasonNewBangumiList()
      .compose(bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(seasonNewBangumiInfo -> {
        results.addAll(seasonNewBangumiInfo.getResult().subList(0,50));
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getBiliAPI()
      .getAttentionDynamic()
      .compose(bindToLifecycle())
      .map(attentionDynamicInfo -> attentionDynamicInfo.getData().getFeeds())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(feedsBeans -> {
        dynamics.addAll(feedsBeans);
        finishTask();
      }, throwable -> initEmptyView());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
 RetrofitHelper.getBangumiAPI()
   .getBangumiIndex()
   .compose(this.bindToLifecycle())
   .doOnSubscribe(this::showProgressBar)
   .subscribeOn(Schedulers.io())
   .delay(2000, TimeUnit.MILLISECONDS)
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(bangumiIndexInfo -> {
    categorys.addAll(bangumiIndexInfo.getResult().getCategory());
    finishTask();
   }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void setBanner() {
  Observable.from(banners)
      .compose(bindToLifecycle())
      .forEach(topBean -> bannerEntities.add(new BannerEntity(
          topBean.getUri(), topBean.getTitle(), topBean.getImage())));
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void setBanner() {
  Observable.from(banners)
      .compose(bindToLifecycle())
      .forEach(topBean -> bannerEntities.add(new BannerEntity(topBean.getUri(),
          topBean.getTitle(), topBean.getImage())));
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

/**
 * 设置轮播banners
 */
private void convertBanner() {
  Observable.from(recommendBanners)
      .compose(bindToLifecycle())
      .forEach(dataBean -> banners.add(new BannerEntity(dataBean.getValue(),
          dataBean.getTitle(), dataBean.getImage())));
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void finishTask() {
  Observable.from(results)
      .compose(bindToLifecycle())
      .forEach(resultBean -> mSectionedRecyclerViewAdapter.addSection(
          new SeasonNewBangumiSection(SeasonNewBangumiActivity.this,
              resultBean.getSeason(), resultBean.getYear(), resultBean.getList())));
  mSectionedRecyclerViewAdapter.notifyDataSetChanged();
  hideProgressBar();
}

代码示例来源:origin: ReactiveX/RxNetty

@Test
public void testCollectOverEmptyObservable() throws Exception {
  TestSubscriber<ByteBuf> t = new TestSubscriber<>();
  Observable.<ByteBuf>empty()
    .compose(CollectBytes.all())
    .subscribe(t);
  t.assertNoErrors();
  t.assertCompleted();
  t.assertValue(Unpooled.buffer());
}

代码示例来源:origin: ReactiveX/RxNetty

@Test
public void testCollectSingleEvent() throws Exception {
  TestSubscriber<ByteBuf> t = new TestSubscriber<>();
  Observable.just(getByteBuf("test"))
      .compose(CollectBytes.all())
      .subscribe(t);
  t.assertNoErrors();
  t.assertCompleted();
  t.assertValues(getByteBuf("test"));
}

代码示例来源:origin: ReactiveX/RxNetty

@Test
public void testReturnSingleEventWithMoreBytesThanMax() throws Exception {
  TestSubscriber<ByteBuf> t = new TestSubscriber<>();
  toByteBufObservable("test")
      .compose(CollectBytes.upTo(0))
      .subscribe(t);
  t.assertError(TooMuchDataException.class);
  t.assertNotCompleted();
  t.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxNetty

@Test
public void testReturnMultipleEvents() throws Exception {
  TestSubscriber<ByteBuf> t = new TestSubscriber<>();
  toByteBufObservable("1", "2")
      .compose(CollectBytes.upTo(5))
      .subscribe(t);
  t.assertNoErrors();
  t.assertCompleted();
  t.assertValues(getByteBufs("12"));
}

相关文章

Observable类方法