本文整理了Java中rx.Observable.doOnSubscribe()
方法的一些代码示例,展示了Observable.doOnSubscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnSubscribe()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnSubscribe
[英]Modifies the source Observable so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the first subscription.
Scheduler: doOnSubscribe does not operate by default on a particular Scheduler.
[中]修改源Observable,以便从订阅方订阅时调用给定操作。每个订阅都会导致调用给定的操作,除非源可观察对象被引用计数,在这种情况下,源可观察对象将为第一个订阅调用给定的操作。
调度器:默认情况下,doOnSubscribe不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxNetty
@Override
public Observable<Void> close(boolean flush) {
if (flush) {
return releaseObservable.doOnSubscribe(new Action0() {
@Override
public void call() {
unpooledDelegate.flush();
}
});
} else {
return releaseObservable;
}
}
代码示例来源:origin: ReactiveX/RxNetty
public DefaultChannelOperations(final Channel nettyChannel, ConnectionEventListener eventListener,
EventPublisher eventPublisher) {
this.nettyChannel = nettyChannel;
this.eventListener = eventListener;
this.eventPublisher = eventPublisher;
closeObservable = Observable.create(new OnSubscribeForClose(nettyChannel));
flushAndCloseObservable = closeObservable.doOnSubscribe(new Action0() {
@Override
public void call() {
flush();
}
});
}
代码示例来源:origin: PipelineAI/pipeline
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
代码示例来源:origin: ReactiveX/RxNetty
@Override
public Observable<Void> discard(final PooledConnection<?, ?> connection) {
return connection.discard().doOnSubscribe(new Action0() {
@Override
public void call() {
EventPublisher eventPublisher = connection.unsafeNettyChannel().attr(EVENT_PUBLISHER).get();
if (eventPublisher.publishingEnabled()) {
ClientEventListener eventListener = connection.unsafeNettyChannel()
.attr(CLIENT_EVENT_LISTENER).get();
eventListener.onPooledConnectionEviction();
}
limitDeterminationStrategy.releasePermit();/*Since, an idle connection took a permit*/
}
});
}
代码示例来源:origin: Rukey7/MvpApp
@Override
public void getData(boolean isRefresh) {
// 因为网易这个原接口参数一大堆,我只传了部分参数,返回的数据会出现图片重复的情况,请不要在意这个问题- -
Observable.from(mPhotoList)
.doOnSubscribe(new Action0() {
@Override
public void call() {
mView.showLoading();
}
})
.compose(mTransformer)
.subscribe(new Subscriber<List<BeautyPhotoInfo>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Logger.e(e.toString());
}
@Override
public void onNext(List<BeautyPhotoInfo> photoList) {
mView.loadData(photoList);
}
});
}
代码示例来源:origin: PipelineAI/pipeline
protected HystrixCachedObservable(final Observable<R> originalObservable) {
ReplaySubject<R> replaySubject = ReplaySubject.create();
this.originalSubscription = originalObservable
.subscribe(replaySubject);
this.cachedObservable = replaySubject
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions--;
if (outstandingSubscriptions == 0) {
originalSubscription.unsubscribe();
}
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions++;
}
});
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
public void loadData() {
RetrofitHelper.getBiliGoAPI()
.getNewBangumiSerialList()
.compose(this.bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.map(NewBangumiSerialInfo::getList)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(listBeans -> {
newBangumiSerials.addAll(listBeans);
finishTask();
}, throwable -> hideProgressBar());
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
public void loadData() {
RetrofitHelper.getBangumiAPI()
.getSeasonNewBangumiList()
.compose(bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(seasonNewBangumiInfo -> {
results.addAll(seasonNewBangumiInfo.getResult().subList(0,50));
finishTask();
}, throwable -> hideProgressBar());
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
public void getUserInfo() {
RetrofitHelper.getAccountAPI()
.getUserInfoById(mid)
.compose(this.bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(userInfo -> {
mUserDetailsInfo = userInfo;
finishTask();
}, throwable -> hideProgressBar());
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
public void loadData() {
RetrofitHelper.getBangumiAPI()
.getBangumiIndex()
.compose(this.bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.subscribeOn(Schedulers.io())
.delay(2000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bangumiIndexInfo -> {
categorys.addAll(bangumiIndexInfo.getResult().getCategory());
finishTask();
}, throwable -> hideProgressBar());
}
代码示例来源:origin: PipelineAI/pipeline
/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllUtilization)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
public void loadData() {
RetrofitHelper.getBangumiAPI()
.getBangumiSchedules()
.compose(bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.delay(2000, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bangumiSchedule -> {
bangumiSchedules.addAll(bangumiSchedule.getResult());
finishTask();
}, throwable -> {
hideProgressBar();
ToastUtil.ShortToast("加载失败啦,请重新加载~");
});
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
public void loadData() {
RetrofitHelper.getBiliAPI()
.getSpInfo(spid, title)
.compose(this.bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.flatMap(new Func1<SpecialTopic, Observable<SpecialTopicIResult>>() {
@Override
public Observable<SpecialTopicIResult> call(SpecialTopic specialTopic) {
mSpecialTopic = specialTopic;
return RetrofitHelper.getBiliAPI()
.getSpItemList(spid, season_id, 1);
}
})
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(specialTopicIResult -> {
spList.addAll(specialTopicIResult.list);
finishTask();
}, throwable -> {
hideProgressBar();
});
}
代码示例来源:origin: Rukey7/MvpApp
@Override
public void getData(boolean isRefresh) {
RetrofitService.getPhotoList()
.doOnSubscribe(new Action0() {
@Override
public void call() {
mView.showLoading();
}
})
.compose(mView.<List<PhotoInfo>>bindToLife())
.subscribe(new Subscriber<List<PhotoInfo>>() {
@Override
public void onCompleted() {
mView.hideLoading();
}
@Override
public void onError(Throwable e) {
Logger.e(e.toString());
mView.showNetError();
}
@Override
public void onNext(List<PhotoInfo> photoList) {
mView.loadData(photoList);
mNextSetId = photoList.get(photoList.size() - 1).getSetid();
}
});
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
public void loadData() {
RetrofitHelper.getVipAPI()
.getVipGame()
.compose(bindToLifecycle())
.doOnSubscribe(this::showProgressBar)
.delay(2000, TimeUnit.MILLISECONDS)
.flatMap(new Func1<VipGameInfo, Observable<String>>() {
@Override
public Observable<String> call(VipGameInfo vipGameInfo) {
mVipGameInfoData = vipGameInfo.getData();
return Observable.just(readAssetsJson());
}
})
.compose(this.bindToLifecycle())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
GameCenterInfo gameCenterInfo = new Gson().fromJson(s, GameCenterInfo.class);
items.addAll(gameCenterInfo.getItems());
finishTask();
}, throwable -> hideProgressBar());
}
代码示例来源:origin: PipelineAI/pipeline
private HystrixDashboardStream(int delayInMs) {
this.delayInMs = delayInMs;
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
.map(new Func1<Long, DashboardData>() {
@Override
public DashboardData call(Long timestamp) {
return new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: apache/usergrid
/**
* Time the obserable with the specified timer
*/
public static <T> Observable<T> time( final Observable<T> observable, final Timer timer ) {
final ObservableTimer proxy = new ObservableTimer( timer );
//attach to the observable
return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( () -> proxy.stop() );
}
}
代码示例来源:origin: PipelineAI/pipeline
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
内容来源于网络,如有侵权,请联系作者删除!