io.reactivex.Scheduler.schedulePeriodicallyDirect()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中io.reactivex.Scheduler.schedulePeriodicallyDirect()方法的一些代码示例,展示了Scheduler.schedulePeriodicallyDirect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler.schedulePeriodicallyDirect()方法的具体详情如下:
包路径:io.reactivex.Scheduler
类名称:Scheduler
方法名:schedulePeriodicallyDirect

Scheduler.schedulePeriodicallyDirect介绍

[英]Schedules a periodic execution of the given task with the given initial time delay and repeat period.

This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.

The periodic execution is at a fixed rate, that is, the first execution will be after the initialDelay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
[中]以给定的初始时间延迟和重复周期安排给定任务的周期性执行。
从多个线程调用此方法是安全的,但任务之间没有顺序保证。
周期性执行是以固定的速率进行的,也就是说,第一次执行将在initialDelay之后,第二次执行将在initialDelay+周期之后,第三次执行将在initialDelay+2*周期之后,依此类推。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Subscription s) {
  if (SubscriptionHelper.validate(this.upstream, s)) {
    this.upstream = s;
    downstream.onSubscribe(this);
    timer.replace(scheduler.schedulePeriodicallyDirect(this, period, period, unit));
    s.request(Long.MAX_VALUE);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    disposable.set(scheduler.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 10000, TimeUnit.MILLISECONDS));
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
public void periodicDirectTaskRaceIO() throws Exception {
  final Scheduler scheduler = Schedulers.io();
  for (int i = 0; i < 100; i++) {
    final Disposable d = scheduler.schedulePeriodicallyDirect(
        Functions.EMPTY_RUNNABLE, 0, 0, TimeUnit.MILLISECONDS);
    Thread.sleep(1);
    d.dispose();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Disposable d) {
  if (DisposableHelper.validate(this.upstream, d)) {
    this.upstream = d;
    downstream.onSubscribe(this);
    Disposable task = scheduler.schedulePeriodicallyDirect(this, period, period, unit);
    DisposableHelper.replace(timer, task);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = UnsupportedOperationException.class)
public void scheduleDirectPeriodic() {
  ImmediateThinScheduler.INSTANCE.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 1, TimeUnit.SECONDS);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Subscription s) {
  if (SubscriptionHelper.validate(this.upstream, s)) {
    this.upstream = s;
    U b;
    try {
      b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null");
    } catch (Throwable e) {
      Exceptions.throwIfFatal(e);
      cancel();
      EmptySubscription.error(e, downstream);
      return;
    }
    buffer = b;
    downstream.onSubscribe(this);
    if (!cancelled) {
      s.request(Long.MAX_VALUE);
      Disposable d = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit);
      if (!timer.compareAndSet(null, d)) {
        d.dispose();
      }
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Subscriber<? super Long> s) {
  IntervalRangeSubscriber is = new IntervalRangeSubscriber(s, start, end);
  s.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Subscriber<? super Long> s) {
  IntervalSubscriber is = new IntervalSubscriber(s);
  s.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
  if (executor instanceof ScheduledExecutorService) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    try {
      ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
      Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit);
      task.setFuture(f);
      return task;
    } catch (RejectedExecutionException ex) {
      RxJavaPlugins.onError(ex);
      return EmptyDisposable.INSTANCE;
    }
  }
  return super.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
/* public: test support. */

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Disposable d) {
  if (DisposableHelper.validate(this.upstream, d)) {
    this.upstream = d;
    window = UnicastSubject.<T>create(bufferSize);
    Observer<? super Observable<T>> a = downstream;
    a.onSubscribe(this);
    a.onNext(window);
    if (!cancelled) {
      Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit);
      DisposableHelper.replace(timer, task);
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Observer<? super Long> observer) {
  IntervalRangeObserver is = new IntervalRangeObserver(observer, start, end);
  observer.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Observer<? super Long> observer) {
  IntervalObserver is = new IntervalObserver(observer);
  observer.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void schedulePeriodicallyDirectNullRunnable() {
    try {
      getScheduler().schedulePeriodicallyDirect(null, 5, 10, TimeUnit.MILLISECONDS);
      fail();
    } catch (NullPointerException npe) {
      assertEquals("run is null", npe.getMessage());
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void reuseScheduledExecutor() throws Exception {
  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  try {
    Scheduler s = Schedulers.from(exec, true);
    final CountDownLatch cdl = new CountDownLatch(8);
    Runnable r = new Runnable() {
      @Override
      public void run() {
        cdl.countDown();
      }
    };
    s.scheduleDirect(r);
    s.scheduleDirect(r, 10, TimeUnit.MILLISECONDS);
    Disposable d = s.schedulePeriodicallyDirect(r, 10, 10, TimeUnit.MILLISECONDS);
    try {
      assertTrue(cdl.await(5, TimeUnit.SECONDS));
    } finally {
      d.dispose();
    }
  } finally {
    exec.shutdown();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void unwrapDefaultPeriodicTask() throws InterruptedException {
  Scheduler s = getScheduler();
  if (s instanceof TrampolineScheduler) {
    // TrampolineScheduler always return EmptyDisposable
    return;
  }
  final CountDownLatch cdl = new CountDownLatch(1);
  Runnable countDownRunnable = new Runnable() {
    @Override
    public void run() {
      cdl.countDown();
    }
  };
  Disposable disposable = s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS);
  SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;
  assertSame(countDownRunnable, wrapper.getWrappedRunnable());
  assertTrue(cdl.await(5, TimeUnit.SECONDS));
  disposable.dispose();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 7000)
public void scheduleDirectPeriodic() throws Exception {
  Scheduler s = getScheduler();
  if (s instanceof TrampolineScheduler) {
    // can't properly stop a trampolined periodic task
    return;
  }
  final CountDownLatch cdl = new CountDownLatch(5);
  Disposable d = s.schedulePeriodicallyDirect(new Runnable() {
    @Override
    public void run() {
      cdl.countDown();
    }
  }, 10, 10, TimeUnit.MILLISECONDS);
  try {
    assertTrue(cdl.await(5, TimeUnit.SECONDS));
  } finally {
    d.dispose();
  }
  assertTrue(d.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Disposable d) {
  if (DisposableHelper.validate(this.upstream, d)) {
    this.upstream = d;
    Observer<? super Observable<T>> a = downstream;
    a.onSubscribe(this);
    if (cancelled) {
      return;
    }
    UnicastSubject<T> w = UnicastSubject.create(bufferSize);
    window = w;
    a.onNext(w);
    Disposable task;
    ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this);
    if (restartTimerOnMaxSize) {
      task = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
    } else {
      task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
    }
    DisposableHelper.replace(timer, task);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rejectingExecutor() {
  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  exec.shutdown();
  Scheduler s = Schedulers.from(exec);
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE));
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE, 10, TimeUnit.MILLISECONDS));
    assertSame(EmptyDisposable.INSTANCE, s.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 10, 10, TimeUnit.MILLISECONDS));
    TestHelper.assertUndeliverable(errors, 0, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 1, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 2, RejectedExecutionException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rejectingExecutor() {
  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  exec.shutdown();
  Scheduler s = Schedulers.from(exec, true);
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE));
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE, 10, TimeUnit.MILLISECONDS));
    assertSame(EmptyDisposable.INSTANCE, s.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 10, 10, TimeUnit.MILLISECONDS));
    TestHelper.assertUndeliverable(errors, 0, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 1, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 2, RejectedExecutionException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shutdownRejects() {
  final int[] calls = { 0 };
  Runnable r = new Runnable() {
    @Override
    public void run() {
      calls[0]++;
    }
  };
  Scheduler s = new SingleScheduler();
  s.shutdown();
  assertEquals(Disposables.disposed(), s.scheduleDirect(r));
  assertEquals(Disposables.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS));
  assertEquals(Disposables.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS));
  Worker w = s.createWorker();
  ((ScheduledWorker)w).executor.shutdownNow();
  assertEquals(Disposables.disposed(), w.schedule(r));
  assertEquals(Disposables.disposed(), w.schedule(r, 1, TimeUnit.SECONDS));
  assertEquals(Disposables.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS));
  assertEquals(0, calls[0]);
  w.dispose();
  assertTrue(w.isDisposed());
}

相关文章