io.reactivex.Scheduler.createWorker()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(178)

本文整理了Java中io.reactivex.Scheduler.createWorker()方法的一些代码示例,展示了Scheduler.createWorker()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler.createWorker()方法的具体详情如下:
包路径:io.reactivex.Scheduler
类名称:Scheduler
方法名:createWorker

Scheduler.createWorker介绍

[英]Retrieves or creates a new Scheduler.Worker that represents sequential execution of actions.

When work is completed, the Worker instance should be released by calling Scheduler.Worker#dispose() to avoid potential resource leaks in the underlying task-execution scheme.

Work on a Scheduler.Worker is guaranteed to be sequential and non-overlapping.
[中]检索或创建新的计划程序。表示操作的顺序执行的工作程序。
工作完成后,应该通过调用调度程序来释放Worker实例。Worker#dispose()以避免底层任务执行方案中潜在的资源泄漏。
在调度程序上工作。保证工人是连续的、不重叠的。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public void subscribeActual(final Subscriber<? super T> s) {
  3. Scheduler.Worker w = scheduler.createWorker();
  4. final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
  5. s.onSubscribe(sos);
  6. w.schedule(sos);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. protected void subscribeActual(Subscriber<? super T> s) {
  3. source.subscribe(new ThrottleLatestSubscriber<T>(s, timeout, unit, scheduler.createWorker(), emitLast));
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. protected void subscribeActual(Subscriber<? super T> t) {
  3. Subscriber<? super T> downstream;
  4. if (delayError) {
  5. downstream = t;
  6. } else {
  7. downstream = new SerializedSubscriber<T>(t);
  8. }
  9. Scheduler.Worker w = scheduler.createWorker();
  10. source.subscribe(new DelaySubscriber<T>(downstream, delay, unit, w, delayError));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @NonNull
  2. @Override
  3. public Worker createWorker() {
  4. return eventLoop.createWorker();
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @NonNull
  2. @Override
  3. public Worker createWorker() {
  4. return eventLoop.createWorker();
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. protected void subscribeActual(Observer<? super T> observer) {
  3. if (scheduler instanceof TrampolineScheduler) {
  4. source.subscribe(observer);
  5. } else {
  6. Scheduler.Worker w = scheduler.createWorker();
  7. source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. @SuppressWarnings("unchecked")
  3. public void subscribeActual(Observer<? super T> t) {
  4. Observer<T> observer;
  5. if (delayError) {
  6. observer = (Observer<T>)t;
  7. } else {
  8. observer = new SerializedObserver<T>(t);
  9. }
  10. Scheduler.Worker w = scheduler.createWorker();
  11. source.subscribe(new DelayObserver<T>(observer, delay, unit, w, delayError));
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. protected void subscribeActual(Subscriber<? super T> s) {
  3. source.subscribe(new DebounceTimedSubscriber<T>(
  4. new SerializedSubscriber<T>(s),
  5. timeout, unit, scheduler.createWorker()));
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public void subscribeActual(Subscriber<? super T> s) {
  3. Worker worker = scheduler.createWorker();
  4. if (s instanceof ConditionalSubscriber) {
  5. source.subscribe(new ObserveOnConditionalSubscriber<T>(
  6. (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
  7. } else {
  8. source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
  9. }
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. protected void subscribeActual(Subscriber<? super T> s) {
  3. source.subscribe(new DebounceTimedSubscriber<T>(
  4. new SerializedSubscriber<T>(s),
  5. timeout, unit, scheduler.createWorker()));
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public void subscribeActual(Observer<? super T> t) {
  3. source.subscribe(new DebounceTimedObserver<T>(
  4. new SerializedObserver<T>(t),
  5. timeout, unit, scheduler.createWorker()));
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public void subscribeActual(Observer<? super T> t) {
  3. source.subscribe(new DebounceTimedObserver<T>(
  4. new SerializedObserver<T>(t),
  5. timeout, unit, scheduler.createWorker()));
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public void run() {
  3. String msg = key + ".1";
  4. workDone.add(msg);
  5. System.out.println(msg);
  6. Worker worker3 = Schedulers.trampoline().createWorker();
  7. worker3.schedule(createPrintAction(key + ".B.1", workDone));
  8. worker3.schedule(createPrintAction(key + ".B.2", workDone));
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = UnsupportedOperationException.class)
  2. public void schedulePeriodic() {
  3. ImmediateThinScheduler.INSTANCE.createWorker().schedulePeriodically(Functions.EMPTY_RUNNABLE, 1, 1, TimeUnit.SECONDS);
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @NonNull
  2. @Override
  3. public Worker createWorker() {
  4. return new SlowInner(actual.createWorker());
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @NonNull
  2. @Override
  3. public Worker createWorker() {
  4. return new SlowInner(actual.createWorker());
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = UnsupportedOperationException.class)
  2. public void scheduleTimed() {
  3. ImmediateThinScheduler.INSTANCE.createWorker().schedule(Functions.EMPTY_RUNNABLE, 1, TimeUnit.SECONDS);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void workerDisposed() {
  3. Worker w = Schedulers.io().createWorker();
  4. assertFalse(((Disposable)w).isDisposed());
  5. w.dispose();
  6. assertTrue(((Disposable)w).isDisposed());
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. Worker w = Schedulers.trampoline().createWorker();
  4. assertFalse(w.isDisposed());
  5. w.dispose();
  6. assertTrue(w.isDisposed());
  7. assertEquals(EmptyDisposable.INSTANCE, w.schedule(Functions.EMPTY_RUNNABLE));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void queueWorkerDispose() {
  3. QueueWorker qw = new QueueWorker(PublishProcessor.<ScheduledAction>create(), Schedulers.single().createWorker());
  4. assertFalse(qw.isDisposed());
  5. qw.dispose();
  6. assertTrue(qw.isDisposed());
  7. qw.dispose();
  8. assertTrue(qw.isDisposed());
  9. }
  10. }

相关文章