本文整理了Java中io.reactivex.Observable.debounce()
方法的一些代码示例,展示了Observable.debounce()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.debounce()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:debounce
[英]Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.
Note: If items keep being emitted by the source ObservableSource faster than the timeout then no items will be emitted by the resulting ObservableSource.
Information on debounce vs throttle:
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Integer> o) throws Exception {
return o.debounce(new Function<Integer, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Integer v) throws Exception {
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(final Observable<Integer> o) throws Exception {
return Observable.just(1).debounce(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return o;
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceTimedSchedulerNull() {
just1.debounce(1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceTimedUnitNull() {
just1.debounce(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceFunctionNull() {
just1.debounce(null);
}
代码示例来源:origin: TeamNewPipe/NewPipe
private Disposable getDebouncedLoader() {
return debouncedSignal.mergeWith(nearEndIntervalSignal)
.debounce(loadDebounceMillis, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(timestamp -> loadImmediate());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the
* source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on
* each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
* <p>
* <em>Note:</em> If items keep being emitted by the source ObservableSource faster than the timeout then no items
* will be emitted by the resulting ObservableSource.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleWithTimeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param timeout
* the length of the window of time that must pass after the emission of an item from the source
* ObservableSource in which that ObservableSource emits no items in order for the item to be emitted by the
* resulting ObservableSource
* @param unit
* the unit of time for the specified {@code timeout}
* @return an Observable that filters out items from the source ObservableSource that are too quickly followed by
* newer items
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #debounce(long, TimeUnit)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return debounce(timeout, unit);
}
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return debounce(timeout, unit, scheduler);
代码示例来源:origin: TeamNewPipe/NewPipe
@Override
protected void initListeners() {
super.initListeners();
RxView.clicks(errorButtonRetry)
.debounce(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(o -> onRetryButtonClicked());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceFunctionReturnsNull() {
just1.debounce(new Function<Integer, Observable<Object>>() {
@Override
public Observable<Object> apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Object> apply(Observable<Object> o) throws Exception {
return o.debounce(Functions.justFunction(Observable.never()));
}
});
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit) {
return debounce(timeout, unit, Schedulers.computation());
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceDefault() throws Exception {
Observable.just(1).debounce(1, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDebounceNeverEmits() {
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposables.empty());
// all should be skipped since they are happening faster than the 200ms timeout
publishNext(observer, 100, "a"); // Should be skipped
publishNext(observer, 200, "b"); // Should be skipped
publishNext(observer, 300, "c"); // Should be skipped
publishNext(observer, 400, "d"); // Should be skipped
publishNext(observer, 500, "e"); // Should be skipped
publishNext(observer, 600, "f"); // Should be skipped
publishNext(observer, 700, "g"); // Should be skipped
publishNext(observer, 800, "h"); // Should be skipped
publishCompleted(observer, 900); // Should be published as soon as the timeout expires.
}
});
Observable<String> sampled = source.debounce(200, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(observer);
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(0)).onNext(anyString());
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceWithEmpty() {
Observable.just(1).debounce(Functions.justFunction(Observable.empty()))
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDebounceWithCompleted() {
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposables.empty());
publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
publishNext(observer, 900, "three"); // Should be skipped since onComplete will arrive before the timeout expires.
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
}
});
Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(observer);
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
// must go to 800 since it must be 400 after when two is sent, which is at 400
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emitLate() {
final AtomicReference<Observer<? super Integer>> ref = new AtomicReference<Observer<? super Integer>>();
TestObserver<Integer> to = Observable.range(1, 2)
.debounce(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer o) throws Exception {
if (o != 1) {
return Observable.never();
}
return new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onSubscribe(Disposables.empty());
ref.set(observer);
}
};
}
})
.test();
ref.get().onNext(1);
to
.assertResult(2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDebounceWithError() {
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposables.empty());
Exception error = new TestException();
publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
publishError(observer, 700, error); // Should be published as soon as the timeout expires.
}
});
Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(observer);
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
// 100 + 400 means it triggers at 500
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onNext("one");
scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceWithTimeBackpressure() throws InterruptedException {
TestScheduler scheduler = new TestScheduler();
TestObserver<Integer> observer = new TestObserver<Integer>();
Observable.merge(
Observable.just(1),
Observable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(observer);
scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
observer.assertValue(2);
observer.assertTerminated();
observer.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timedError() {
Observable.error(new TestException())
.debounce(1, TimeUnit.SECONDS)
.test()
.assertFailure(TestException.class);
}
}
内容来源于网络,如有侵权,请联系作者删除!