io.reactivex.Scheduler.createWorker()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(158)

本文整理了Java中io.reactivex.Scheduler.createWorker()方法的一些代码示例,展示了Scheduler.createWorker()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler.createWorker()方法的具体详情如下:
包路径:io.reactivex.Scheduler
类名称:Scheduler
方法名:createWorker

Scheduler.createWorker介绍

[英]Retrieves or creates a new Scheduler.Worker that represents sequential execution of actions.

When work is completed, the Worker instance should be released by calling Scheduler.Worker#dispose() to avoid potential resource leaks in the underlying task-execution scheme.

Work on a Scheduler.Worker is guaranteed to be sequential and non-overlapping.
[中]检索或创建新的计划程序。表示操作的顺序执行的工作程序。
工作完成后,应该通过调用调度程序来释放Worker实例。Worker#dispose()以避免底层任务执行方案中潜在的资源泄漏。
在调度程序上工作。保证工人是连续的、不重叠的。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(final Subscriber<? super T> s) {
  Scheduler.Worker w = scheduler.createWorker();
  final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
  s.onSubscribe(sos);
  w.schedule(sos);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> s) {
  source.subscribe(new ThrottleLatestSubscriber<T>(s, timeout, unit, scheduler.createWorker(), emitLast));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> t) {
  Subscriber<? super T> downstream;
  if (delayError) {
    downstream = t;
  } else {
    downstream = new SerializedSubscriber<T>(t);
  }
  Scheduler.Worker w = scheduler.createWorker();
  source.subscribe(new DelaySubscriber<T>(downstream, delay, unit, w, delayError));
}

代码示例来源:origin: ReactiveX/RxJava

@NonNull
@Override
public Worker createWorker() {
  return eventLoop.createWorker();
}

代码示例来源:origin: ReactiveX/RxJava

@NonNull
@Override
public Worker createWorker() {
  return eventLoop.createWorker();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Observer<? super T> observer) {
  if (scheduler instanceof TrampolineScheduler) {
    source.subscribe(observer);
  } else {
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> t) {
  Observer<T> observer;
  if (delayError) {
    observer = (Observer<T>)t;
  } else {
    observer = new SerializedObserver<T>(t);
  }
  Scheduler.Worker w = scheduler.createWorker();
  source.subscribe(new DelayObserver<T>(observer, delay, unit, w, delayError));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> s) {
  source.subscribe(new DebounceTimedSubscriber<T>(
      new SerializedSubscriber<T>(s),
      timeout, unit, scheduler.createWorker()));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Subscriber<? super T> s) {
  Worker worker = scheduler.createWorker();
  if (s instanceof ConditionalSubscriber) {
    source.subscribe(new ObserveOnConditionalSubscriber<T>(
        (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
  } else {
    source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> s) {
  source.subscribe(new DebounceTimedSubscriber<T>(
      new SerializedSubscriber<T>(s),
      timeout, unit, scheduler.createWorker()));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Observer<? super T> t) {
  source.subscribe(new DebounceTimedObserver<T>(
      new SerializedObserver<T>(t),
      timeout, unit, scheduler.createWorker()));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Observer<? super T> t) {
  source.subscribe(new DebounceTimedObserver<T>(
      new SerializedObserver<T>(t),
      timeout, unit, scheduler.createWorker()));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void run() {
  String msg = key + ".1";
  workDone.add(msg);
  System.out.println(msg);
  Worker worker3 = Schedulers.trampoline().createWorker();
  worker3.schedule(createPrintAction(key + ".B.1", workDone));
  worker3.schedule(createPrintAction(key + ".B.2", workDone));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = UnsupportedOperationException.class)
  public void schedulePeriodic() {
    ImmediateThinScheduler.INSTANCE.createWorker().schedulePeriodically(Functions.EMPTY_RUNNABLE, 1, 1, TimeUnit.SECONDS);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@NonNull
@Override
public Worker createWorker() {
  return new SlowInner(actual.createWorker());
}

代码示例来源:origin: ReactiveX/RxJava

@NonNull
@Override
public Worker createWorker() {
  return new SlowInner(actual.createWorker());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = UnsupportedOperationException.class)
public void scheduleTimed() {
  ImmediateThinScheduler.INSTANCE.createWorker().schedule(Functions.EMPTY_RUNNABLE, 1, TimeUnit.SECONDS);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerDisposed() {
  Worker w = Schedulers.io().createWorker();
  assertFalse(((Disposable)w).isDisposed());
  w.dispose();
  assertTrue(((Disposable)w).isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  Worker w = Schedulers.trampoline().createWorker();
  assertFalse(w.isDisposed());
  w.dispose();
  assertTrue(w.isDisposed());
  assertEquals(EmptyDisposable.INSTANCE, w.schedule(Functions.EMPTY_RUNNABLE));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void queueWorkerDispose() {
    QueueWorker qw = new QueueWorker(PublishProcessor.<ScheduledAction>create(), Schedulers.single().createWorker());

    assertFalse(qw.isDisposed());

    qw.dispose();

    assertTrue(qw.isDisposed());

    qw.dispose();

    assertTrue(qw.isDisposed());
  }
}

相关文章