rx.Scheduler.createWorker()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中rx.Scheduler.createWorker()方法的一些代码示例,展示了Scheduler.createWorker()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler.createWorker()方法的具体详情如下:
包路径:rx.Scheduler
类名称:Scheduler
方法名:createWorker

Scheduler.createWorker介绍

[英]Retrieves or creates a new Scheduler.Worker that represents serial execution of actions.

When work is completed it should be unsubscribed using Scheduler.Worker#unsubscribe().

Work on a Scheduler.Worker is guaranteed to be sequential.
[中]检索或创建新的计划程序。表示操作的串行执行的工作进程。
当工作完成时,应该使用Scheduler取消订阅。工人#取消订阅()。
在调度程序上工作。工人保证是循序渐进的。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
public Worker createWorker() {
  return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

代码示例来源:origin: ReactiveX/RxNetty

writeWorker = Schedulers.computation().createWorker();

代码示例来源:origin: ReactiveX/RxNetty

@Test(timeout = 100000)
public void testWritesInOrderFromDifferentThreads() throws Exception {
  final WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
  // Set the current thread to be the thread of the event loop
  inspectorRule.setEventLoopThread();
  // Send 1000 messages from two different threads
  int msgCount = 1000;
  Scheduler.Worker worker = Schedulers.computation().createWorker();
  for (int i = 1; i < msgCount; i+=2) {
    sub1.onNext(String.valueOf(i));
    // Send from other thread
    inspectorRule.sendFromOtherThread(sub1, worker, String.valueOf(i+1));
  }
  // In lack of a way of running all pending tasks on computation scheduler
  Thread.sleep(500);
  // Ensure messages are in order
  Queue<Object> written = inspectorRule.getWrittenMessages();
  for (int i = 1; i <= msgCount; i++) {
    Object msg = written.poll();
    String strMsg = ((ByteBuf) msg).toString(Charset.defaultCharset());
    assertThat("Not in order ", strMsg, is(String.valueOf(i)));
  }
}

代码示例来源:origin: BaronZ88/MinimalistWeather

observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork.doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> {
  try {
    weatherDao.insertOrUpdateWeather(weather);

代码示例来源:origin: akarnokd/RxJava2Interop

@Override
public Worker createWorker() {
  return new WorkerV1ToWorkerV2(source.createWorker());
}

代码示例来源:origin: stackoverflow.com

public static void runAction(Action0 action, Scheduler scheduler) {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new Action0() {
    @Override
    public void call() {
      action.call();
      worker.unsubscribe();
    }
  });
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerUnsubscribe() {
  rx.Scheduler v1Scheduler = mock(rx.Scheduler.class);
  io.reactivex.Scheduler v2Scheduler = RxJavaInterop.toV2Scheduler(v1Scheduler);
  rx.Scheduler.Worker v1Worker = mock(rx.Scheduler.Worker.class);
  when(v1Scheduler.createWorker()).thenReturn(v1Worker);
  io.reactivex.Scheduler.Worker v2Worker = v2Scheduler.createWorker();
  verify(v1Worker, never()).unsubscribe();
  v2Worker.dispose();
  verify(v1Worker).unsubscribe();
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerUnsubscribe() {
  Scheduler v2Scheduler = mock(Scheduler.class);
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  Scheduler.Worker v2Worker = mock(Scheduler.Worker.class);
  when(v2Scheduler.createWorker()).thenReturn(v2Worker);
  rx.Scheduler.Worker v1Worker = v1Scheduler.createWorker();
  verify(v2Worker, never()).dispose();
  v1Worker.unsubscribe();
  verify(v2Worker).dispose();
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerNow() {
  rx.Scheduler v1Scheduler = mock(rx.Scheduler.class);
  io.reactivex.Scheduler v2Scheduler = RxJavaInterop.toV2Scheduler(v1Scheduler);
  rx.Scheduler.Worker v1Worker = mock(rx.Scheduler.Worker.class);
  when(v1Scheduler.createWorker()).thenReturn(v1Worker);
  io.reactivex.Scheduler.Worker v2Worker = v2Scheduler.createWorker();
  when(v1Worker.now()).thenReturn(123L);
  assertEquals(123L, v2Worker.now(TimeUnit.MILLISECONDS));
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerNow() {
  Scheduler v2Scheduler = mock(Scheduler.class);
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  Scheduler.Worker v2Worker = mock(Scheduler.Worker.class);
  when(v2Scheduler.createWorker()).thenReturn(v2Worker);
  rx.Scheduler.Worker v1Worker = v1Scheduler.createWorker();
  when(v2Worker.now(MILLISECONDS)).thenReturn(123L);
  assertEquals(123L, v1Worker.now());
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerIsUnsubscribed() {
  rx.Scheduler v1Scheduler = mock(rx.Scheduler.class);
  io.reactivex.Scheduler v2Scheduler = RxJavaInterop.toV2Scheduler(v1Scheduler);
  rx.Scheduler.Worker v1Worker = mock(rx.Scheduler.Worker.class);
  when(v1Scheduler.createWorker()).thenReturn(v1Worker);
  io.reactivex.Scheduler.Worker v2Worker = v2Scheduler.createWorker();
  when(v1Worker.isUnsubscribed()).thenReturn(true);
  assertTrue(v2Worker.isDisposed());
  when(v1Worker.isUnsubscribed()).thenReturn(false);
  assertFalse(v2Worker.isDisposed());
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerIsUnsubscribed() {
  Scheduler v2Scheduler = mock(Scheduler.class);
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  Scheduler.Worker v2Worker = mock(Scheduler.Worker.class);
  when(v2Scheduler.createWorker()).thenReturn(v2Worker);
  rx.Scheduler.Worker v1Worker = v1Scheduler.createWorker();
  when(v2Worker.isDisposed()).thenReturn(true);
  assertTrue(v1Worker.isUnsubscribed());
  when(v2Worker.isDisposed()).thenReturn(false);
  assertFalse(v1Worker.isUnsubscribed());
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerScheduleNullAction() {
  Scheduler v2Scheduler = mock(Scheduler.class);
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  try {
    v1Scheduler.createWorker().schedule(null);
    fail();
  } catch (NullPointerException expected) {
    assertEquals("Source 1.x Action0 is null", expected.getMessage());
  }
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerScheduleDelayedNullAction() {
  Scheduler v2Scheduler = mock(Scheduler.class);
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  try {
    v1Scheduler.createWorker().schedule(null, 123L, MINUTES);
    fail();
  } catch (NullPointerException expected) {
    assertEquals("Source 1.x Action0 is null", expected.getMessage());
  }
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerSchedulePeriodicallyNullAction() {
  Scheduler v2Scheduler = mock(Scheduler.class);
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  try {
    v1Scheduler.createWorker().schedulePeriodically(null, 10L, 123L, MINUTES);
    fail();
  } catch (NullPointerException expected) {
    assertEquals("Source 1.x Action0 is null", expected.getMessage());
  }
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerSchedule() {
  TestScheduler v2Scheduler = new TestScheduler();
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  Action0 action0 = mock(Action0.class);
  v1Scheduler.createWorker().schedule(action0);
  verifyZeroInteractions(action0);
  v2Scheduler.triggerActions();
  verify(action0).call();
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerScheduleDelayed() {
  TestScheduler v2Scheduler = new TestScheduler();
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  Action0 action0 = mock(Action0.class);
  v1Scheduler.createWorker().schedule(action0, 123L, MINUTES);
  verifyZeroInteractions(action0);
  v2Scheduler.advanceTimeBy(122L, MINUTES);
  verifyZeroInteractions(action0);
  v2Scheduler.advanceTimeBy(1L, MINUTES);
  verify(action0).call();
  v2Scheduler.advanceTimeBy(125L, MINUTES); // Check that it's not periodic.
  verifyNoMoreInteractions(action0);
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void workerSchedulePeriodically() {
  TestScheduler v2Scheduler = new TestScheduler();
  rx.Scheduler v1Scheduler = RxJavaInterop.toV1Scheduler(v2Scheduler);
  Action0 action0 = mock(Action0.class);
  v1Scheduler.createWorker().schedulePeriodically(action0, 10L, 123L, MINUTES);
  verifyZeroInteractions(action0);
  v2Scheduler.advanceTimeBy(9L, MINUTES);
  verifyZeroInteractions(action0);
  v2Scheduler.advanceTimeBy(1L, MINUTES);
  verify(action0).call();
  v2Scheduler.advanceTimeBy(122L, MINUTES);
  verifyNoMoreInteractions(action0);
  v2Scheduler.advanceTimeBy(1L, MINUTES);
  verify(action0, times(2)).call();
  v2Scheduler.advanceTimeBy(123L, MINUTES);
  verify(action0, times(3)).call(); // Verify periodic.
  v2Scheduler.advanceTimeBy(123L, MINUTES);
  verify(action0, times(4)).call(); // Verify periodic.
}

代码示例来源:origin: com.netflix.eureka/eureka2-core

protected SerializedTaskInvoker(SerializedTaskInvokerMetrics metrics, Scheduler scheduler) {
  this.worker = scheduler.createWorker();
  this.queueSize = new AtomicInteger(0);
  this.metrics = metrics;
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
  final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
  child.add(inner);
  
  final SerialSubscription serialSubscription = new SerialSubscription();
  // add serialSubscription so it gets unsubscribed if child is unsubscribed
  child.add(serialSubscription);
  
  return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
}

相关文章