rx.Observable.doOnSubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中rx.Observable.doOnSubscribe()方法的一些代码示例,展示了Observable.doOnSubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnSubscribe()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnSubscribe

Observable.doOnSubscribe介绍

[英]Modifies the source Observable so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the first subscription.

Scheduler: doOnSubscribe does not operate by default on a particular Scheduler.
[中]修改源Observable,以便从订阅方订阅时调用给定操作。每个订阅都会导致调用给定的操作,除非源可观察对象被引用计数,在这种情况下,源可观察对象将为第一个订阅调用给定的操作。
调度器:默认情况下,doOnSubscribe不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxNetty

@Override
public Observable<Void> close(boolean flush) {
  if (flush) {
    return releaseObservable.doOnSubscribe(new Action0() {
      @Override
      public void call() {
        unpooledDelegate.flush();
      }
    });
  } else {
    return releaseObservable;
  }
}

代码示例来源:origin: ReactiveX/RxNetty

public DefaultChannelOperations(final Channel nettyChannel, ConnectionEventListener eventListener,
                EventPublisher eventPublisher) {
  this.nettyChannel = nettyChannel;
  this.eventListener = eventListener;
  this.eventPublisher = eventPublisher;
  closeObservable = Observable.create(new OnSubscribeForClose(nettyChannel));
  flushAndCloseObservable = closeObservable.doOnSubscribe(new Action0() {
    @Override
    public void call() {
      flush();
    }
  });
}

代码示例来源:origin: PipelineAI/pipeline

@Override
final protected Observable<R> getExecutionObservable() {
  return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
      try {
        return Observable.just(run());
      } catch (Throwable ex) {
        return Observable.error(ex);
      }
    }
  }).doOnSubscribe(new Action0() {
    @Override
    public void call() {
      // Save thread on which we get subscribed so that we can interrupt it later if needed
      executionThread.set(Thread.currentThread());
    }
  });
}

代码示例来源:origin: ReactiveX/RxNetty

@Override
public Observable<Void> discard(final PooledConnection<?, ?> connection) {
  return connection.discard().doOnSubscribe(new Action0() {
    @Override
    public void call() {
      EventPublisher eventPublisher = connection.unsafeNettyChannel().attr(EVENT_PUBLISHER).get();
      if (eventPublisher.publishingEnabled()) {
        ClientEventListener eventListener = connection.unsafeNettyChannel()
                                 .attr(CLIENT_EVENT_LISTENER).get();
        eventListener.onPooledConnectionEviction();
      }
      limitDeterminationStrategy.releasePermit();/*Since, an idle connection took a permit*/
    }
  });
}

代码示例来源:origin: Rukey7/MvpApp

@Override
public void getData(boolean isRefresh) {
  // 因为网易这个原接口参数一大堆,我只传了部分参数,返回的数据会出现图片重复的情况,请不要在意这个问题- -
  Observable.from(mPhotoList)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          mView.showLoading();
        }
      })
      .compose(mTransformer)
      .subscribe(new Subscriber<List<BeautyPhotoInfo>>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
          Logger.e(e.toString());
        }
        @Override
        public void onNext(List<BeautyPhotoInfo> photoList) {
          mView.loadData(photoList);
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

protected HystrixCachedObservable(final Observable<R> originalObservable) {
  ReplaySubject<R> replaySubject = ReplaySubject.create();
  this.originalSubscription = originalObservable
      .subscribe(replaySubject);
  this.cachedObservable = replaySubject
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          outstandingSubscriptions--;
          if (outstandingSubscriptions == 0) {
            originalSubscription.unsubscribe();
          }
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          outstandingSubscriptions++;
        }
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getBiliGoAPI()
      .getNewBangumiSerialList()
      .compose(this.bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .map(NewBangumiSerialInfo::getList)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(listBeans -> {
        newBangumiSerials.addAll(listBeans);
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getBangumiAPI()
      .getSeasonNewBangumiList()
      .compose(bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(seasonNewBangumiInfo -> {
        results.addAll(seasonNewBangumiInfo.getResult().subList(0,50));
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

public void getUserInfo() {
  RetrofitHelper.getAccountAPI()
      .getUserInfoById(mid)
      .compose(this.bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(userInfo -> {
        mUserDetailsInfo = userInfo;
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
 RetrofitHelper.getBangumiAPI()
   .getBangumiIndex()
   .compose(this.bindToLifecycle())
   .doOnSubscribe(this::showProgressBar)
   .subscribeOn(Schedulers.io())
   .delay(2000, TimeUnit.MILLISECONDS)
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(bangumiIndexInfo -> {
    categorys.addAll(bangumiIndexInfo.getResult().getCategory());
    finishTask();
   }, throwable -> hideProgressBar());
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllUtilization)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllConfig)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getBangumiAPI()
      .getBangumiSchedules()
      .compose(bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .delay(2000, TimeUnit.MILLISECONDS)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(bangumiSchedule -> {
        bangumiSchedules.addAll(bangumiSchedule.getResult());
        finishTask();
      }, throwable -> {
        hideProgressBar();
        ToastUtil.ShortToast("加载失败啦,请重新加载~");
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getBiliAPI()
      .getSpInfo(spid, title)
      .compose(this.bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .flatMap(new Func1<SpecialTopic, Observable<SpecialTopicIResult>>() {
        @Override
        public Observable<SpecialTopicIResult> call(SpecialTopic specialTopic) {
          mSpecialTopic = specialTopic;
          return RetrofitHelper.getBiliAPI()
              .getSpItemList(spid, season_id, 1);
        }
      })
      .compose(bindToLifecycle())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(specialTopicIResult -> {
        spList.addAll(specialTopicIResult.list);
        finishTask();
      }, throwable -> {
        hideProgressBar();
      });
}

代码示例来源:origin: Rukey7/MvpApp

@Override
public void getData(boolean isRefresh) {
  RetrofitService.getPhotoList()
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          mView.showLoading();
        }
      })
      .compose(mView.<List<PhotoInfo>>bindToLife())
      .subscribe(new Subscriber<List<PhotoInfo>>() {
        @Override
        public void onCompleted() {
          mView.hideLoading();
        }
        @Override
        public void onError(Throwable e) {
          Logger.e(e.toString());
          mView.showNetError();
        }
        @Override
        public void onNext(List<PhotoInfo> photoList) {
          mView.loadData(photoList);
          mNextSetId = photoList.get(photoList.size() - 1).getSetid();
        }
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
public void loadData() {
  RetrofitHelper.getVipAPI()
      .getVipGame()
      .compose(bindToLifecycle())
      .doOnSubscribe(this::showProgressBar)
      .delay(2000, TimeUnit.MILLISECONDS)
      .flatMap(new Func1<VipGameInfo, Observable<String>>() {
        @Override
        public Observable<String> call(VipGameInfo vipGameInfo) {
          mVipGameInfoData = vipGameInfo.getData();
          return Observable.just(readAssetsJson());
        }
      })
      .compose(this.bindToLifecycle())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(s -> {
        GameCenterInfo gameCenterInfo = new Gson().fromJson(s, GameCenterInfo.class);
        items.addAll(gameCenterInfo.getItems());
        finishTask();
      }, throwable -> hideProgressBar());
}

代码示例来源:origin: PipelineAI/pipeline

private HystrixDashboardStream(int delayInMs) {
  this.delayInMs = delayInMs;
  this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
      .map(new Func1<Long, DashboardData>() {
        @Override
        public DashboardData call(Long timestamp) {
          return new DashboardData(
              HystrixCommandMetrics.getInstances(),
              HystrixThreadPoolMetrics.getInstances(),
              HystrixCollapserMetrics.getInstances()
          );
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                    final Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
      return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
  };
  this.sourceStream = bucketedStream      //stream broken up into buckets
      .window(numBuckets, 1)          //emit overlapping windows of buckets
      .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: apache/usergrid

/**
   * Time the obserable with the specified timer
   */
  public static <T> Observable<T> time( final Observable<T> observable, final Timer timer ) {
    final ObservableTimer proxy = new ObservableTimer( timer );

    //attach to the observable
    return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( () -> proxy.stop() );
  }
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

相关文章

Observable类方法