io.reactivex.Observable.debounce()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中io.reactivex.Observable.debounce()方法的一些代码示例,展示了Observable.debounce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.debounce()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:debounce

Observable.debounce介绍

[英]Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.

Note: If items keep being emitted by the source ObservableSource faster than the timeout then no items will be emitted by the resulting ObservableSource.

Information on debounce vs throttle:

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Observable<Integer> o) throws Exception {
  3. return o.debounce(new Function<Integer, ObservableSource<Long>>() {
  4. @Override
  5. public ObservableSource<Long> apply(Integer v) throws Exception {
  6. return Observable.timer(1, TimeUnit.SECONDS);
  7. }
  8. });
  9. }
  10. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(final Observable<Integer> o) throws Exception {
  3. return Observable.just(1).debounce(new Function<Integer, ObservableSource<Integer>>() {
  4. @Override
  5. public ObservableSource<Integer> apply(Integer v) throws Exception {
  6. return o;
  7. }
  8. });
  9. }
  10. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceTimedSchedulerNull() {
  3. just1.debounce(1, TimeUnit.SECONDS, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceTimedUnitNull() {
  3. just1.debounce(1, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceFunctionNull() {
  3. just1.debounce(null);
  4. }

代码示例来源:origin: TeamNewPipe/NewPipe

  1. private Disposable getDebouncedLoader() {
  2. return debouncedSignal.mergeWith(nearEndIntervalSignal)
  3. .debounce(loadDebounceMillis, TimeUnit.MILLISECONDS)
  4. .subscribeOn(Schedulers.single())
  5. .observeOn(AndroidSchedulers.mainThread())
  6. .subscribe(timestamp -> loadImmediate());
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the
  3. * source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on
  4. * each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
  5. * <p>
  6. * <em>Note:</em> If items keep being emitted by the source ObservableSource faster than the timeout then no items
  7. * will be emitted by the resulting ObservableSource.
  8. * <p>
  9. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="">
  10. * <dl>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>{@code throttleWithTimeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param timeout
  16. * the length of the window of time that must pass after the emission of an item from the source
  17. * ObservableSource in which that ObservableSource emits no items in order for the item to be emitted by the
  18. * resulting ObservableSource
  19. * @param unit
  20. * the unit of time for the specified {@code timeout}
  21. * @return an Observable that filters out items from the source ObservableSource that are too quickly followed by
  22. * newer items
  23. * @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
  24. * @see #debounce(long, TimeUnit)
  25. */
  26. @CheckReturnValue
  27. @SchedulerSupport(SchedulerSupport.COMPUTATION)
  28. public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
  29. return debounce(timeout, unit);
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SchedulerSupport(SchedulerSupport.CUSTOM)
  2. public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
  3. return debounce(timeout, unit, scheduler);

代码示例来源:origin: TeamNewPipe/NewPipe

  1. @Override
  2. protected void initListeners() {
  3. super.initListeners();
  4. RxView.clicks(errorButtonRetry)
  5. .debounce(300, TimeUnit.MILLISECONDS)
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .subscribe(o -> onRetryButtonClicked());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void debounceFunctionReturnsNull() {
  3. just1.debounce(new Function<Integer, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Object> apply(Observable<Object> o) throws Exception {
  3. return o.debounce(Functions.justFunction(Observable.never()));
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @SchedulerSupport(SchedulerSupport.COMPUTATION)
  2. public final Observable<T> debounce(long timeout, TimeUnit unit) {
  3. return debounce(timeout, unit, Schedulers.computation());

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceDefault() throws Exception {
  3. Observable.just(1).debounce(1, TimeUnit.SECONDS)
  4. .test()
  5. .awaitDone(5, TimeUnit.SECONDS)
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDebounceNeverEmits() {
  3. Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
  4. @Override
  5. public void subscribe(Observer<? super String> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. // all should be skipped since they are happening faster than the 200ms timeout
  8. publishNext(observer, 100, "a"); // Should be skipped
  9. publishNext(observer, 200, "b"); // Should be skipped
  10. publishNext(observer, 300, "c"); // Should be skipped
  11. publishNext(observer, 400, "d"); // Should be skipped
  12. publishNext(observer, 500, "e"); // Should be skipped
  13. publishNext(observer, 600, "f"); // Should be skipped
  14. publishNext(observer, 700, "g"); // Should be skipped
  15. publishNext(observer, 800, "h"); // Should be skipped
  16. publishCompleted(observer, 900); // Should be published as soon as the timeout expires.
  17. }
  18. });
  19. Observable<String> sampled = source.debounce(200, TimeUnit.MILLISECONDS, scheduler);
  20. sampled.subscribe(observer);
  21. scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  22. InOrder inOrder = inOrder(observer);
  23. inOrder.verify(observer, times(0)).onNext(anyString());
  24. scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  25. inOrder.verify(observer, times(1)).onComplete();
  26. inOrder.verifyNoMoreInteractions();
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceWithEmpty() {
  3. Observable.just(1).debounce(Functions.justFunction(Observable.empty()))
  4. .test()
  5. .assertResult(1);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDebounceWithCompleted() {
  3. Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
  4. @Override
  5. public void subscribe(Observer<? super String> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
  8. publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
  9. publishNext(observer, 900, "three"); // Should be skipped since onComplete will arrive before the timeout expires.
  10. publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
  11. }
  12. });
  13. Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
  14. sampled.subscribe(observer);
  15. scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  16. InOrder inOrder = inOrder(observer);
  17. // must go to 800 since it must be 400 after when two is sent, which is at 400
  18. scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
  19. inOrder.verify(observer, times(1)).onNext("two");
  20. scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  21. inOrder.verify(observer, times(1)).onComplete();
  22. inOrder.verifyNoMoreInteractions();
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emitLate() {
  3. final AtomicReference<Observer<? super Integer>> ref = new AtomicReference<Observer<? super Integer>>();
  4. TestObserver<Integer> to = Observable.range(1, 2)
  5. .debounce(new Function<Integer, ObservableSource<Integer>>() {
  6. @Override
  7. public ObservableSource<Integer> apply(Integer o) throws Exception {
  8. if (o != 1) {
  9. return Observable.never();
  10. }
  11. return new Observable<Integer>() {
  12. @Override
  13. protected void subscribeActual(Observer<? super Integer> observer) {
  14. observer.onSubscribe(Disposables.empty());
  15. ref.set(observer);
  16. }
  17. };
  18. }
  19. })
  20. .test();
  21. ref.get().onNext(1);
  22. to
  23. .assertResult(2);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDebounceWithError() {
  3. Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
  4. @Override
  5. public void subscribe(Observer<? super String> observer) {
  6. observer.onSubscribe(Disposables.empty());
  7. Exception error = new TestException();
  8. publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
  9. publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
  10. publishError(observer, 700, error); // Should be published as soon as the timeout expires.
  11. }
  12. });
  13. Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
  14. sampled.subscribe(observer);
  15. scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  16. InOrder inOrder = inOrder(observer);
  17. // 100 + 400 means it triggers at 500
  18. scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
  19. inOrder.verify(observer).onNext("one");
  20. scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
  21. inOrder.verify(observer).onError(any(TestException.class));
  22. inOrder.verifyNoMoreInteractions();
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void debounceWithTimeBackpressure() throws InterruptedException {
  3. TestScheduler scheduler = new TestScheduler();
  4. TestObserver<Integer> observer = new TestObserver<Integer>();
  5. Observable.merge(
  6. Observable.just(1),
  7. Observable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
  8. ).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(observer);
  9. scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
  10. observer.assertValue(2);
  11. observer.assertTerminated();
  12. observer.assertNoErrors();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timedError() {
  3. Observable.error(new TestException())
  4. .debounce(1, TimeUnit.SECONDS)
  5. .test()
  6. .assertFailure(TestException.class);
  7. }
  8. }

相关文章

Observable类方法