本文整理了Java中rx.Observable.sample()
方法的一些代码示例,展示了Observable.sample()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.sample()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:sample
[英]Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable within periodic time intervals.
Backpressure Support: This operator does not support backpressure as it uses time to control data flow. Scheduler: sample operates by default on the computation Scheduler.
[中]返回一个可观测项,该可观测项在周期性时间间隔内发射源可观测项最近发射的项目(如果有)。
背压支持:该操作员不支持背压,因为它使用时间来控制数据流。调度程序:默认情况下,示例在计算调度程序上运行。
代码示例来源:origin: jhusain/learnrxjava
public static void main(String args[]) {
hotStream().sample(500, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Returns an Observable that emits only the last item emitted by the source Observable during sequential
* time windows of a specified duration.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas
* {@link #throttleFirst} does not tick, it just tracks passage of time.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleLast.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleLast} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param intervalDuration
* duration of windows within which the last item emitted by the source Observable will be
* emitted
* @param unit
* the unit of time of {@code intervalDuration}
* @return an Observable that performs the throttle operation
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#sample-or-throttlelast">RxJava wiki: throttleLast</a>
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sample.aspx">MSDN: Observable.Sample</a>
* @see #sample(long, TimeUnit)
*/
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
return sample(intervalDuration, unit);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
return sample(intervalDuration, unit, scheduler);
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
* within periodic time intervals.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sample.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @return an Observable that emits the results of sampling the items emitted by the source Observable at
* the specified time interval
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#sample-or-throttlelast">RxJava wiki: sample</a>
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sample.aspx">MSDN: Observable.Sample</a>
* @see #throttleLast(long, TimeUnit)
*/
public final Observable<T> sample(long period, TimeUnit unit) {
return sample(period, unit, Schedulers.computation());
}
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
final Subscription subscription = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
sleep(100);
}
}
})
.subscribeOn(Schedulers.newThread())
.sample(1, TimeUnit.SECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
AsyncExecutor.SINGLETON.schedule(new Runnable() {
@Override
public void run() {
if (!subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
}
}, 3, TimeUnit.SECONDS);
}
});
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_64() throws Exception {
Observable<Long> obs = Observable.interval(20, MILLISECONDS);
//equivalent:
obs.sample(1, SECONDS);
obs.sample(Observable.interval(1, SECONDS));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_25() throws Exception {
Observable<String> delayedNames = delayedNames();
delayedNames
.sample(1, SECONDS)
.subscribe(System.out::println);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_170() throws Exception {
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.distinct()
.distinct(Picture::getTag)
.sample(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_9() throws Exception {
long startTime = System.currentTimeMillis();
Observable
.interval(7, MILLISECONDS)
.timestamp()
.sample(1, SECONDS)
.map(ts -> ts.getTimestampMillis() - startTime + "ms: " + ts.getValue())
.take(5)
.subscribe(System.out::println);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_37() throws Exception {
Observable<String> delayedNames = delayedNames();
delayedNames
.concatWith(delayedCompletion())
.sample(1, SECONDS)
.subscribe(System.out::println);
}
内容来源于网络,如有侵权,请联系作者删除!