
x33g5p2x  于2022-01-25 转载在 其他  



[英]Returns an Observable that emits 0L after a specified delay, and then completes.

Scheduler: timer operates by default on the computation Scheduler.


代码示例来源:origin: ReactiveX/RxJava

  public ObservableSource<Long> call() throws Exception {
    if (count++ == 1) {
      return null;
    return Observable.timer(1, TimeUnit.MILLISECONDS);
}, new Callable<Collection<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

  public ObservableSource<Long> call() throws Exception {
    if (count++ == 1) {
      throw new TestException();
    return Observable.timer(1, TimeUnit.MILLISECONDS);
}, new Callable<Collection<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void timerUnitNull() {
  Observable.timer(1, null);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void timerSchedulerNull() {
  Observable.timer(1, TimeUnit.SECONDS, null);

代码示例来源:origin: ReactiveX/RxJava

 * Delays the actual subscription to the current Single until the given time delay elapsed.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code delaySubscription} does by default subscribe to the current Single
 * on the {@link Scheduler} you provided, after the delay.</dd>
 * </dl>
 * @param time the time amount to wait with the subscription
 * @param unit the time unit of the waiting
 * @param scheduler the scheduler to wait on and subscribe on to the current Single
 * @return the new Single instance
 * @since 2.0
public final Single<T> delaySubscription(long time, TimeUnit unit, Scheduler scheduler) {
  return delaySubscription(Observable.timer(time, unit, scheduler));

代码示例来源:origin: ReactiveX/RxJava

 * Returns an Observable that emits {@code 0L} after a specified delay, and then completes.
 * <p>
 * <img width="640" height="200" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code timer} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 * @param delay
 *            the initial delay before emitting a single {@code 0L}
 * @param unit
 *            time units to use for {@code delay}
 * @return an Observable that {@code 0L} after a specified delay, and then completes
 * @see <a href="">ReactiveX operators documentation: Timer</a>
public static Observable<Long> timer(long delay, TimeUnit unit) {
  return timer(delay, unit, Schedulers.computation());

代码示例来源:origin: ReactiveX/RxJava

 * Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
 * elapses.
 * <p>
 * <img width="640" height="305" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code skip} does not operate on any particular scheduler but uses the current time
 *  from the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 * @param time
 *            the length of the time window to skip
 * @param unit
 *            the time unit of {@code time}
 * @return an Observable that skips values emitted by the source ObservableSource before the time window defined
 *         by {@code time} elapses and the emits the remainder
 * @see <a href="">ReactiveX operators documentation: Skip</a>
public final Observable<T> skip(long time, TimeUnit unit) {
  return skipUntil(timer(time, unit));

代码示例来源:origin: ReactiveX/RxJava

 * Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time,
 * both waiting and subscribing on a given Scheduler.
 * <p>
 * <img width="640" height="310" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
 * </dl>
 * @param delay
 *            the time to delay the subscription
 * @param unit
 *            the time unit of {@code delay}
 * @param scheduler
 *            the Scheduler on which the waiting and subscription will happen
 * @return an Observable that delays the subscription to the source ObservableSource by a given
 *         amount, waiting and subscribing on the given Scheduler
 * @see <a href="">ReactiveX operators documentation: Delay</a>
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
  return delaySubscription(timer(delay, unit, scheduler));

代码示例来源:origin: ReactiveX/RxJava

 * Returns an Observable that emits those items emitted by source ObservableSource before a specified time runs
 * out.
 * <p>
 * If time runs out before the {@code Observable} completes normally, the {@code onComplete} event will be
 * signaled on the default {@code computation} {@link Scheduler}.
 * <p>
 * <img width="640" height="305" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This version of {@code take} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 * @param time
 *            the length of the time window
 * @param unit
 *            the time unit of {@code time}
 * @return an Observable that emits those items emitted by the source ObservableSource before the time runs out
 * @see <a href="">ReactiveX operators documentation: Take</a>
public final Observable<T> take(long time, TimeUnit unit) {
  return takeUntil(timer(time, unit));

代码示例来源:origin: ReactiveX/RxJava

 * Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
 * on a specified {@link Scheduler} elapses.
 * <p>
 * <img width="640" height="305" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>You specify which {@link Scheduler} this operator will use for the timed skipping</dd>
 * </dl>
 * @param time
 *            the length of the time window to skip
 * @param unit
 *            the time unit of {@code time}
 * @param scheduler
 *            the {@link Scheduler} on which the timed wait happens
 * @return an Observable that skips values emitted by the source ObservableSource before the time window defined
 *         by {@code time} and {@code scheduler} elapses, and then emits the remainder
 * @see <a href="">ReactiveX operators documentation: Skip</a>
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
  return skipUntil(timer(time, unit, scheduler));

代码示例来源:origin: ReactiveX/RxJava

public void testTimerOnce() {
  Observable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  verify(observer, times(1)).onNext(0L);
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));

代码示例来源:origin: ReactiveX/RxJava

public void disposed() {
  TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS));

代码示例来源:origin: ReactiveX/RxJava

public void emitLastOther() {
  .sample(Observable.timer(1, TimeUnit.DAYS), true)

代码示例来源:origin: ReactiveX/RxJava

public void timerDelayZero() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    for (int i = 0; i < 1000; i++) {
      Observable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {

代码示例来源:origin: ReactiveX/RxJava

public void testOnceObserverThrows() {
  Observable<Long> source = Observable.timer(100, TimeUnit.MILLISECONDS, scheduler);
  source.safeSubscribe(new DefaultObserver<Long>() {
    public void onNext(Long t) {
      throw new TestException();
    public void onError(Throwable e) {
    public void onComplete() {
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  verify(observer, never()).onNext(anyLong());
  verify(observer, never()).onComplete();

代码示例来源:origin: ReactiveX/RxJava

public void bufferBoundaryHint() {
  Observable.range(1, 5).buffer(Observable.timer(1, TimeUnit.MINUTES), 2)
  .assertResult(Arrays.asList(1, 2, 3, 4, 5));

代码示例来源:origin: ReactiveX/RxJava

public void emitLastOtherEmpty() {
  .sample(Observable.timer(1, TimeUnit.DAYS), true)

代码示例来源:origin: ReactiveX/RxJava

public void delaySubscriptionObservable() throws Exception {
  Single.just(1).delaySubscription(Observable.timer(100, TimeUnit.MILLISECONDS))
  .awaitDone(5, TimeUnit.SECONDS)

代码示例来源:origin: ReactiveX/RxJava

public void boundaryBufferSupplierThrows2() {
  .buffer(Functions.justCallable(Observable.timer(1, TimeUnit.MILLISECONDS)), new Callable<Collection<Object>>() {
    int count;
    public Collection<Object> call() throws Exception {
      if (count++ == 1) {
        throw new TestException();
      } else {
        return new ArrayList<Object>();
  .awaitDone(5, TimeUnit.SECONDS)

代码示例来源:origin: ReactiveX/RxJava

public void boundaryBufferSupplierReturnsNull() {
  .buffer(Functions.justCallable(Observable.timer(1, TimeUnit.MILLISECONDS)), new Callable<Collection<Object>>() {
    int count;
    public Collection<Object> call() throws Exception {
      if (count++ == 1) {
        return null;
      } else {
        return new ArrayList<Object>();
  .awaitDone(5, TimeUnit.SECONDS)

