io.reactivex.Observable.timer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(191)

本文整理了Java中io.reactivex.Observable.timer()方法的一些代码示例,展示了Observable.timer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.timer()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:timer

Observable.timer介绍

[英]Returns an Observable that emits 0L after a specified delay, and then completes.

Scheduler: timer operates by default on the computation Scheduler.
[中]返回一个可观测值,该值在指定延迟后发出0L,然后完成。
调度程序:默认情况下,计时器在计算调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Long> call() throws Exception {
    if (count++ == 1) {
      return null;
    }
    return Observable.timer(1, TimeUnit.MILLISECONDS);
  }
}, new Callable<Collection<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Long> call() throws Exception {
    if (count++ == 1) {
      throw new TestException();
    }
    return Observable.timer(1, TimeUnit.MILLISECONDS);
  }
}, new Callable<Collection<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void timerUnitNull() {
  Observable.timer(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void timerSchedulerNull() {
  Observable.timer(1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Delays the actual subscription to the current Single until the given time delay elapsed.
 * <dl>
 * <dt><b>Scheduler:</b></dt>
 * <dd>{@code delaySubscription} does by default subscribe to the current Single
 * on the {@link Scheduler} you provided, after the delay.</dd>
 * </dl>
 * @param time the time amount to wait with the subscription
 * @param unit the time unit of the waiting
 * @param scheduler the scheduler to wait on and subscribe on to the current Single
 * @return the new Single instance
 * @since 2.0
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> delaySubscription(long time, TimeUnit unit, Scheduler scheduler) {
  return delaySubscription(Observable.timer(time, unit, scheduler));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that emits {@code 0L} after a specified delay, and then completes.
 * <p>
 * <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timer.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code timer} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @param delay
 *            the initial delay before emitting a single {@code 0L}
 * @param unit
 *            time units to use for {@code delay}
 * @return an Observable that {@code 0L} after a specified delay, and then completes
 * @see <a href="http://reactivex.io/documentation/operators/timer.html">ReactiveX operators documentation: Timer</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> timer(long delay, TimeUnit unit) {
  return timer(delay, unit, Schedulers.computation());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
 * elapses.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code skip} does not operate on any particular scheduler but uses the current time
 *  from the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @param time
 *            the length of the time window to skip
 * @param unit
 *            the time unit of {@code time}
 * @return an Observable that skips values emitted by the source ObservableSource before the time window defined
 *         by {@code time} elapses and the emits the remainder
 * @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> skip(long time, TimeUnit unit) {
  return skipUntil(timer(time, unit));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time,
 * both waiting and subscribing on a given Scheduler.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.s.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
 * </dl>
 *
 * @param delay
 *            the time to delay the subscription
 * @param unit
 *            the time unit of {@code delay}
 * @param scheduler
 *            the Scheduler on which the waiting and subscription will happen
 * @return an Observable that delays the subscription to the source ObservableSource by a given
 *         amount, waiting and subscribing on the given Scheduler
 * @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
  return delaySubscription(timer(delay, unit, scheduler));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that emits those items emitted by source ObservableSource before a specified time runs
 * out.
 * <p>
 * If time runs out before the {@code Observable} completes normally, the {@code onComplete} event will be
 * signaled on the default {@code computation} {@link Scheduler}.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/take.t.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This version of {@code take} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @param time
 *            the length of the time window
 * @param unit
 *            the time unit of {@code time}
 * @return an Observable that emits those items emitted by the source ObservableSource before the time runs out
 * @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> take(long time, TimeUnit unit) {
  return takeUntil(timer(time, unit));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
 * on a specified {@link Scheduler} elapses.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.ts.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>You specify which {@link Scheduler} this operator will use for the timed skipping</dd>
 * </dl>
 *
 * @param time
 *            the length of the time window to skip
 * @param unit
 *            the time unit of {@code time}
 * @param scheduler
 *            the {@link Scheduler} on which the timed wait happens
 * @return an Observable that skips values emitted by the source ObservableSource before the time window defined
 *         by {@code time} and {@code scheduler} elapses, and then emits the remainder
 * @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
  return skipUntil(timer(time, unit, scheduler));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTimerOnce() {
  Observable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  verify(observer, times(1)).onNext(0L);
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposed() {
  TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emitLastOther() {
  Observable.just(1)
  .sample(Observable.timer(1, TimeUnit.DAYS), true)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timerDelayZero() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    for (int i = 0; i < 1000; i++) {
      Observable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
    }
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnceObserverThrows() {
  Observable<Long> source = Observable.timer(100, TimeUnit.MILLISECONDS, scheduler);
  source.safeSubscribe(new DefaultObserver<Long>() {
    @Override
    public void onNext(Long t) {
      throw new TestException();
    }
    @Override
    public void onError(Throwable e) {
      observer.onError(e);
    }
    @Override
    public void onComplete() {
      observer.onComplete();
    }
  });
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  verify(observer).onError(any(TestException.class));
  verify(observer, never()).onNext(anyLong());
  verify(observer, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferBoundaryHint() {
  Observable.range(1, 5).buffer(Observable.timer(1, TimeUnit.MINUTES), 2)
  .test()
  .assertResult(Arrays.asList(1, 2, 3, 4, 5));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emitLastOtherEmpty() {
  Observable.empty()
  .sample(Observable.timer(1, TimeUnit.DAYS), true)
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delaySubscriptionObservable() throws Exception {
  Single.just(1).delaySubscription(Observable.timer(100, TimeUnit.MILLISECONDS))
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void boundaryBufferSupplierThrows2() {
  Observable.never()
  .buffer(Functions.justCallable(Observable.timer(1, TimeUnit.MILLISECONDS)), new Callable<Collection<Object>>() {
    int count;
    @Override
    public Collection<Object> call() throws Exception {
      if (count++ == 1) {
        throw new TestException();
      } else {
        return new ArrayList<Object>();
      }
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void boundaryBufferSupplierReturnsNull() {
  Observable.never()
  .buffer(Functions.justCallable(Observable.timer(1, TimeUnit.MILLISECONDS)), new Callable<Collection<Object>>() {
    int count;
    @Override
    public Collection<Object> call() throws Exception {
      if (count++ == 1) {
        return null;
      } else {
        return new ArrayList<Object>();
      }
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(NullPointerException.class);
}

相关文章

Observable类方法