io.reactivex.Observable.timer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(210)

本文整理了Java中io.reactivex.Observable.timer()方法的一些代码示例,展示了Observable.timer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.timer()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:timer

Observable.timer介绍

[英]Returns an Observable that emits 0L after a specified delay, and then completes.

Scheduler: timer operates by default on the computation Scheduler.
[中]返回一个可观测值,该值在指定延迟后发出0L,然后完成。
调度程序:默认情况下,计时器在计算调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Long> call() throws Exception {
  3. if (count++ == 1) {
  4. return null;
  5. }
  6. return Observable.timer(1, TimeUnit.MILLISECONDS);
  7. }
  8. }, new Callable<Collection<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Long> call() throws Exception {
  3. if (count++ == 1) {
  4. throw new TestException();
  5. }
  6. return Observable.timer(1, TimeUnit.MILLISECONDS);
  7. }
  8. }, new Callable<Collection<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void timerUnitNull() {
  3. Observable.timer(1, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void timerSchedulerNull() {
  3. Observable.timer(1, TimeUnit.SECONDS, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Delays the actual subscription to the current Single until the given time delay elapsed.
  3. * <dl>
  4. * <dt><b>Scheduler:</b></dt>
  5. * <dd>{@code delaySubscription} does by default subscribe to the current Single
  6. * on the {@link Scheduler} you provided, after the delay.</dd>
  7. * </dl>
  8. * @param time the time amount to wait with the subscription
  9. * @param unit the time unit of the waiting
  10. * @param scheduler the scheduler to wait on and subscribe on to the current Single
  11. * @return the new Single instance
  12. * @since 2.0
  13. */
  14. @CheckReturnValue
  15. @SchedulerSupport(SchedulerSupport.CUSTOM)
  16. public final Single<T> delaySubscription(long time, TimeUnit unit, Scheduler scheduler) {
  17. return delaySubscription(Observable.timer(time, unit, scheduler));
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that emits {@code 0L} after a specified delay, and then completes.
  3. * <p>
  4. * <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timer.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code timer} operates by default on the {@code computation} {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param delay
  11. * the initial delay before emitting a single {@code 0L}
  12. * @param unit
  13. * time units to use for {@code delay}
  14. * @return an Observable that {@code 0L} after a specified delay, and then completes
  15. * @see <a href="http://reactivex.io/documentation/operators/timer.html">ReactiveX operators documentation: Timer</a>
  16. */
  17. @CheckReturnValue
  18. @SchedulerSupport(SchedulerSupport.COMPUTATION)
  19. public static Observable<Long> timer(long delay, TimeUnit unit) {
  20. return timer(delay, unit, Schedulers.computation());
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
  3. * elapses.
  4. * <p>
  5. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code skip} does not operate on any particular scheduler but uses the current time
  9. * from the {@code computation} {@link Scheduler}.</dd>
  10. * </dl>
  11. *
  12. * @param time
  13. * the length of the time window to skip
  14. * @param unit
  15. * the time unit of {@code time}
  16. * @return an Observable that skips values emitted by the source ObservableSource before the time window defined
  17. * by {@code time} elapses and the emits the remainder
  18. * @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
  19. */
  20. @CheckReturnValue
  21. @SchedulerSupport(SchedulerSupport.COMPUTATION)
  22. public final Observable<T> skip(long time, TimeUnit unit) {
  23. return skipUntil(timer(time, unit));
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that delays the subscription to the source ObservableSource by a given amount of time,
  3. * both waiting and subscribing on a given Scheduler.
  4. * <p>
  5. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.s.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>You specify which {@link Scheduler} this operator will use.</dd>
  9. * </dl>
  10. *
  11. * @param delay
  12. * the time to delay the subscription
  13. * @param unit
  14. * the time unit of {@code delay}
  15. * @param scheduler
  16. * the Scheduler on which the waiting and subscription will happen
  17. * @return an Observable that delays the subscription to the source ObservableSource by a given
  18. * amount, waiting and subscribing on the given Scheduler
  19. * @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.CUSTOM)
  23. public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
  24. return delaySubscription(timer(delay, unit, scheduler));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that emits those items emitted by source ObservableSource before a specified time runs
  3. * out.
  4. * <p>
  5. * If time runs out before the {@code Observable} completes normally, the {@code onComplete} event will be
  6. * signaled on the default {@code computation} {@link Scheduler}.
  7. * <p>
  8. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/take.t.png" alt="">
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>This version of {@code take} operates by default on the {@code computation} {@link Scheduler}.</dd>
  12. * </dl>
  13. *
  14. * @param time
  15. * the length of the time window
  16. * @param unit
  17. * the time unit of {@code time}
  18. * @return an Observable that emits those items emitted by the source ObservableSource before the time runs out
  19. * @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final Observable<T> take(long time, TimeUnit unit) {
  24. return takeUntil(timer(time, unit));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
  3. * on a specified {@link Scheduler} elapses.
  4. * <p>
  5. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.ts.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>You specify which {@link Scheduler} this operator will use for the timed skipping</dd>
  9. * </dl>
  10. *
  11. * @param time
  12. * the length of the time window to skip
  13. * @param unit
  14. * the time unit of {@code time}
  15. * @param scheduler
  16. * the {@link Scheduler} on which the timed wait happens
  17. * @return an Observable that skips values emitted by the source ObservableSource before the time window defined
  18. * by {@code time} and {@code scheduler} elapses, and then emits the remainder
  19. * @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.CUSTOM)
  23. public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
  24. return skipUntil(timer(time, unit, scheduler));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTimerOnce() {
  3. Observable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer);
  4. scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  5. verify(observer, times(1)).onNext(0L);
  6. verify(observer, times(1)).onComplete();
  7. verify(observer, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposed() {
  3. TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS));
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emitLastOther() {
  3. Observable.just(1)
  4. .sample(Observable.timer(1, TimeUnit.DAYS), true)
  5. .test()
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timerDelayZero() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. for (int i = 0; i < 1000; i++) {
  6. Observable.timer(0, TimeUnit.MILLISECONDS).blockingFirst();
  7. }
  8. assertTrue(errors.toString(), errors.isEmpty());
  9. } finally {
  10. RxJavaPlugins.reset();
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testOnceObserverThrows() {
  3. Observable<Long> source = Observable.timer(100, TimeUnit.MILLISECONDS, scheduler);
  4. source.safeSubscribe(new DefaultObserver<Long>() {
  5. @Override
  6. public void onNext(Long t) {
  7. throw new TestException();
  8. }
  9. @Override
  10. public void onError(Throwable e) {
  11. observer.onError(e);
  12. }
  13. @Override
  14. public void onComplete() {
  15. observer.onComplete();
  16. }
  17. });
  18. scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  19. verify(observer).onError(any(TestException.class));
  20. verify(observer, never()).onNext(anyLong());
  21. verify(observer, never()).onComplete();
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void bufferBoundaryHint() {
  4. Observable.range(1, 5).buffer(Observable.timer(1, TimeUnit.MINUTES), 2)
  5. .test()
  6. .assertResult(Arrays.asList(1, 2, 3, 4, 5));
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emitLastOtherEmpty() {
  3. Observable.empty()
  4. .sample(Observable.timer(1, TimeUnit.DAYS), true)
  5. .test()
  6. .assertResult();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delaySubscriptionObservable() throws Exception {
  3. Single.just(1).delaySubscription(Observable.timer(100, TimeUnit.MILLISECONDS))
  4. .test()
  5. .awaitDone(5, TimeUnit.SECONDS)
  6. .assertResult(1);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @SuppressWarnings("unchecked")
  3. public void boundaryBufferSupplierThrows2() {
  4. Observable.never()
  5. .buffer(Functions.justCallable(Observable.timer(1, TimeUnit.MILLISECONDS)), new Callable<Collection<Object>>() {
  6. int count;
  7. @Override
  8. public Collection<Object> call() throws Exception {
  9. if (count++ == 1) {
  10. throw new TestException();
  11. } else {
  12. return new ArrayList<Object>();
  13. }
  14. }
  15. })
  16. .test()
  17. .awaitDone(5, TimeUnit.SECONDS)
  18. .assertFailure(TestException.class);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @SuppressWarnings("unchecked")
  3. public void boundaryBufferSupplierReturnsNull() {
  4. Observable.never()
  5. .buffer(Functions.justCallable(Observable.timer(1, TimeUnit.MILLISECONDS)), new Callable<Collection<Object>>() {
  6. int count;
  7. @Override
  8. public Collection<Object> call() throws Exception {
  9. if (count++ == 1) {
  10. return null;
  11. } else {
  12. return new ArrayList<Object>();
  13. }
  14. }
  15. })
  16. .test()
  17. .awaitDone(5, TimeUnit.SECONDS)
  18. .assertFailure(NullPointerException.class);
  19. }

相关文章

Observable类方法