本文整理了Java中rx.Observable.doOnRequest()
方法的一些代码示例,展示了Observable.doOnRequest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnRequest()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnRequest
暂无
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) throws Exception {
Observable<Integer> source = range(1, 10000);
source
.doOnRequest(i -> System.out.println("Requested " + i))
.groupBy(v -> v % 5)
.flatMap(g -> g.observeOn(Schedulers.io()).map(GroupByTest2::calculation), 4)
.subscribe(i -> System.out.println("Got " + i));
Thread.sleep(100000);
}
代码示例来源:origin: youxin11544/RxJava_Simple
/**
* listener调用方式,在主线程订阅并将返回结果通过 listener 通知调用方
* @param listener 接收回调结果
*/
public void launch(final OnCompressListener listener) {
asObservable().subscribeOn(AndroidSchedulers.mainThread()).doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
listener.onStart();
}
}).subscribe(new Action1<File>() {
@Override
public void call(File file) {
listener.onSuccess(file);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
listener.onError(throwable);
}
});
}
代码示例来源:origin: youxin11544/RxJava_Simple
/**
* listener调用方式,在主线程订阅并将返回结果通过 listener 通知调用方
* @param listener 接收回调结果
*/
public void launch(final OnMultiCompressListener listener) {
asListObservable().subscribeOn(AndroidSchedulers.mainThread())
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
listener.onStart();
}
})
.subscribe(new Action1<List<File>>() {
@Override
public void call(List<File> files) {
listener.onSuccess(files);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
listener.onError(throwable);
}
});
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<T> call() {
final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
op.requestMore(n);
}
});
}
});
代码示例来源:origin: youxin11544/RxJava_Simple
.putGear(Luban.CUSTOM_GEAR)
.asListObservable()
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<T> call() {
final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
op.requestMore(n);
}
});
}
});
代码示例来源:origin: youxin11544/RxJava_Simple
.putGear(Luban.CUSTOM_GEAR)
.asListObservable()
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
代码示例来源:origin: AbbyJM/weather
@Override
public void loadWeather(String city,boolean needToast){
if(Util.isNetworkConnected(WeatherApplication.getAppContext())){
subscription = ApiClient.getInstance().fetchWeather(city).doOnRequest(aLong -> {
if(needToast)
mWeakView.get().toastLoading();
}).subscribe(new Subscriber<WeatherBean>() {
@Override
public void onCompleted() {
if(needToast)
mWeakView.get().toastComplete();
}
@Override
public void onError(Throwable e) {
if(needToast)
mWeakView.get().toastError();
}
@Override
public void onNext(WeatherBean weather) {
mWeakView.get().showWeather(weather);
}
});
}
}
代码示例来源:origin: davidmoten/rxjava-extras
public static void main(String[] args) throws InterruptedException {
Observable.range(1, 10000) //
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println("requested " + n);
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("unsubscribed");
}
}) //
.compose(Transformers.<Integer> onBackpressureBufferRequestLimiting()) //
.take(10) //
.subscribeOn(Schedulers.io()) //
.doOnNext(Actions.println()) //
.count().toBlocking().single();
Thread.sleep(2000);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public static void main(String[] args) throws InterruptedException {
Observable.range(1, 10000) //
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println("requested " + n);
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("unsubscribed");
}
}) //
.compose(Transformers.<Integer> onBackpressureBufferRequestLimiting()) //
.take(10) //
.subscribeOn(Schedulers.io()) //
.doOnNext(Actions.println()) //
.count().toBlocking().single();
Thread.sleep(2000);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_271() throws Exception {
Observable<Instant> timestamps = Observable
.fromCallable(() -> dbQuery())
.doOnSubscribe(() -> log.info("subscribe()"))
.doOnRequest(c -> log.info("Requested {}", c))
.doOnNext(instant -> log.info("Got: {}", instant));
timestamps
.zipWith(timestamps.skip(1), Duration::between)
.map(Object::toString)
.subscribe(log::info);
}
代码示例来源:origin: akarnokd/akarnokd-misc
int j = i;
Observable.just(null)
.doOnRequest(v -> System.out.println(j))
.subscribeOn(subscribeScheduler)
.observeOn(mapScheduler)
内容来源于网络,如有侵权,请联系作者删除!