io.reactivex.Observable.debounce()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中io.reactivex.Observable.debounce()方法的一些代码示例,展示了Observable.debounce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.debounce()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:debounce

Observable.debounce介绍

[英]Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.

Note: If items keep being emitted by the source ObservableSource faster than the timeout then no items will be emitted by the resulting ObservableSource.

Information on debounce vs throttle:

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Integer> o) throws Exception {
    return o.debounce(new Function<Integer, ObservableSource<Long>>() {
      @Override
      public ObservableSource<Long> apply(Integer v) throws Exception {
        return Observable.timer(1, TimeUnit.SECONDS);
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(final Observable<Integer> o) throws Exception {
    return Observable.just(1).debounce(new Function<Integer, ObservableSource<Integer>>() {
      @Override
      public ObservableSource<Integer> apply(Integer v) throws Exception {
        return o;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void debounceTimedSchedulerNull() {
  just1.debounce(1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void debounceTimedUnitNull() {
  just1.debounce(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void debounceFunctionNull() {
  just1.debounce(null);
}

代码示例来源:origin: TeamNewPipe/NewPipe

private Disposable getDebouncedLoader() {
  return debouncedSignal.mergeWith(nearEndIntervalSignal)
      .debounce(loadDebounceMillis, TimeUnit.MILLISECONDS)
      .subscribeOn(Schedulers.single())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(timestamp -> loadImmediate());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the
 * source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on
 * each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
 * <p>
 * <em>Note:</em> If items keep being emitted by the source ObservableSource faster than the timeout then no items
 * will be emitted by the resulting ObservableSource.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code throttleWithTimeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @param timeout
 *            the length of the window of time that must pass after the emission of an item from the source
 *            ObservableSource in which that ObservableSource emits no items in order for the item to be emitted by the
 *            resulting ObservableSource
 * @param unit
 *            the unit of time for the specified {@code timeout}
 * @return an Observable that filters out items from the source ObservableSource that are too quickly followed by
 *         newer items
 * @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
 * @see #debounce(long, TimeUnit)
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
  return debounce(timeout, unit);
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
  return debounce(timeout, unit, scheduler);

代码示例来源:origin: TeamNewPipe/NewPipe

@Override
protected void initListeners() {
  super.initListeners();
  RxView.clicks(errorButtonRetry)
      .debounce(300, TimeUnit.MILLISECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(o -> onRetryButtonClicked());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void debounceFunctionReturnsNull() {
  just1.debounce(new Function<Integer, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Observable<Object> o) throws Exception {
    return o.debounce(Functions.justFunction(Observable.never()));
  }
});

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit) {
  return debounce(timeout, unit, Schedulers.computation());

代码示例来源:origin: ReactiveX/RxJava

@Test
public void debounceDefault() throws Exception {
  Observable.just(1).debounce(1, TimeUnit.SECONDS)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDebounceNeverEmits() {
  Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      // all should be skipped since they are happening faster than the 200ms timeout
      publishNext(observer, 100, "a");    // Should be skipped
      publishNext(observer, 200, "b");    // Should be skipped
      publishNext(observer, 300, "c");    // Should be skipped
      publishNext(observer, 400, "d");    // Should be skipped
      publishNext(observer, 500, "e");    // Should be skipped
      publishNext(observer, 600, "f");    // Should be skipped
      publishNext(observer, 700, "g");    // Should be skipped
      publishNext(observer, 800, "h");    // Should be skipped
      publishCompleted(observer, 900);     // Should be published as soon as the timeout expires.
    }
  });
  Observable<String> sampled = source.debounce(200, TimeUnit.MILLISECONDS, scheduler);
  sampled.subscribe(observer);
  scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(0)).onNext(anyString());
  scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void debounceWithEmpty() {
  Observable.just(1).debounce(Functions.justFunction(Observable.empty()))
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDebounceWithCompleted() {
  Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      publishNext(observer, 100, "one");    // Should be skipped since "two" will arrive before the timeout expires.
      publishNext(observer, 400, "two");    // Should be published since "three" will arrive after the timeout expires.
      publishNext(observer, 900, "three");   // Should be skipped since onComplete will arrive before the timeout expires.
      publishCompleted(observer, 1000);     // Should be published as soon as the timeout expires.
    }
  });
  Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
  sampled.subscribe(observer);
  scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  // must go to 800 since it must be 400 after when two is sent, which is at 400
  scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
  inOrder.verify(observer, times(1)).onNext("two");
  scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emitLate() {
  final AtomicReference<Observer<? super Integer>> ref = new AtomicReference<Observer<? super Integer>>();
  TestObserver<Integer> to = Observable.range(1, 2)
  .debounce(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer o) throws Exception {
      if (o != 1) {
        return Observable.never();
      }
      return new Observable<Integer>() {
        @Override
        protected void subscribeActual(Observer<? super Integer> observer) {
          observer.onSubscribe(Disposables.empty());
          ref.set(observer);
        }
      };
    }
  })
  .test();
  ref.get().onNext(1);
  to
  .assertResult(2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDebounceWithError() {
  Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      Exception error = new TestException();
      publishNext(observer, 100, "one");    // Should be published since "two" will arrive after the timeout expires.
      publishNext(observer, 600, "two");    // Should be skipped since onError will arrive before the timeout expires.
      publishError(observer, 700, error);   // Should be published as soon as the timeout expires.
    }
  });
  Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
  sampled.subscribe(observer);
  scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  // 100 + 400 means it triggers at 500
  scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
  inOrder.verify(observer).onNext("one");
  scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
  inOrder.verify(observer).onError(any(TestException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void debounceWithTimeBackpressure() throws InterruptedException {
  TestScheduler scheduler = new TestScheduler();
  TestObserver<Integer> observer = new TestObserver<Integer>();
  Observable.merge(
      Observable.just(1),
      Observable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
  ).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(observer);
  scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
  observer.assertValue(2);
  observer.assertTerminated();
  observer.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void timedError() {
    Observable.error(new TestException())
    .debounce(1, TimeUnit.SECONDS)
    .test()
    .assertFailure(TestException.class);
  }
}

相关文章

Observable类方法