reactor.core.publisher.Mono.subscribeOn()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(6.8k)|赞(0)|评价(0)|浏览(756)

本文整理了Java中reactor.core.publisher.Mono.subscribeOn()方法的一些代码示例,展示了Mono.subscribeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.subscribeOn()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:subscribeOn

Mono.subscribeOn介绍

[英]Run subscribe, onSubscribe and request on a specified Scheduler's Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a #publishOn(Scheduler).

mono.subscribeOn(Schedulers.parallel()).subscribe())

[中]在指定计划程序的工作进程上运行subscribe、onSubscribe和request。因此,将此操作符放置在链中的任何位置也会影响从链的开始到下一次出现#publishOn(调度程序)的onNext/onError/onComplete信号的执行上下文。

mono.subscribeOn(Schedulers.parallel()).subscribe())

代码示例

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
  MonoProcessor<Void> completionMono = MonoProcessor.create();
  return Mono.fromCallable(
      () -> {
        if (logger.isDebugEnabled()) {
          logger.debug("Connecting to " + url);
        }
        List<String> protocols = handler.getSubProtocols();
        DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
        Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
        ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
        return this.webSocketContainer.connectToServer(endpoint, config, url);
      })
      .subscribeOn(Schedulers.elastic()) // connectToServer is blocking
      .then(completionMono);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void asyncRunnableBackpressured() {
  AtomicReference<Thread> t = new AtomicReference<>();
  StepVerifier.create(Mono.fromRunnable(() -> t.set(Thread.currentThread()))
              .subscribeOn(Schedulers.single()), 0)
        .verifyComplete();
  assertThat(t).isNotNull();
  assertThat(t).isNotEqualTo(Thread.currentThread());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void asyncRunnable() {
  AtomicReference<Thread> t = new AtomicReference<>();
  StepVerifier.create(Mono.fromRunnable(() -> t.set(Thread.currentThread()))
              .subscribeOn(Schedulers.single()))
        .verifyComplete();
  assertThat(t).isNotNull();
  assertThat(t).isNotEqualTo(Thread.currentThread());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableThrows() {
  StepVerifier.create(Mono.fromCallable(() -> {
    throw new IOException("forced failure");
  }).subscribeOn(Schedulers.single()))
        .expectErrorMatches(e -> e instanceof IOException
            && e.getMessage().equals("forced failure"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testSubscribeOnValueFusion() {
  StepVerifier.create(Mono.just(1)
              .flatMapMany(f -> Mono.just(f + 1)
                         .subscribeOn(Schedulers.parallel())
                         .map(this::slow)))
        .expectFusion(Fuseable.ASYNC, Fuseable.NONE)
        .expectNext(2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  StepVerifier.create(Mono.error(new RuntimeException("forced failure"))
              .subscribeOn(Schedulers.single()))
        .verifyErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> flatMapScenario() {
  return Flux.interval(Duration.ofSeconds(3))
        .flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
             .flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
                      .subscribeOn(Schedulers.parallel())
                      .flatMapMany(Flux::fromIterable))).log();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableEvaluatedTheRightTime() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> p = Mono.fromCallable(count::incrementAndGet).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
  Assert.assertEquals(0, count.get());
  p.subscribeWith(AssertSubscriber.create()).await();
  Assert.assertEquals(1, count.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableEvaluatedTheRightTime() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> p = Mono.fromCallable(count::incrementAndGet)
             .subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
  Assert.assertEquals(0, count.get());
  p.subscribeWith(AssertSubscriber.create())
   .await();
  Assert.assertEquals(1, count.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void errorHide() {
    StepVerifier.create(Mono.error(new RuntimeException("forced failure"))
                .hide()
                .subscribeOn(Schedulers.single()))
          .verifyErrorMessage("forced failure");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicJust() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
             .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicEmptyBackpressured() throws Exception {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Mono.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
             .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void asyncSupplyingNull() {
  StepVerifier.create(Mono.fromSupplier(() -> null)
              .subscribeOn(Schedulers.single()), 1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void asyncSupplyingNullBackpressuredShortcuts() {
  StepVerifier.create(Mono.fromSupplier(() -> null)
              .subscribeOn(Schedulers.single()), 0)
        .expectSubscription()
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableNull() {
  StepVerifier.create(Mono.fromCallable(() -> null).subscribeOn(Schedulers.single()))
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableNullBackpressured() {
  StepVerifier.create(
      Mono.fromCallable(() -> null).subscribeOn(Schedulers.single()), 0)
        .expectSubscription()
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classic() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.fromSupplier(() -> 1)
    .subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValueCount(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  StepVerifier.create(Mono.fromCallable(() -> 1).subscribeOn(Schedulers.single()))
        .expectNext(1)
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressured() {
  StepVerifier.withVirtualTime( () ->
      Mono.fromCallable(() -> 1).subscribeOn(Schedulers.single()), 0)
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(1))
        .thenRequest(1)
        .thenAwait()
        .expectNext(1)
        .expectComplete()
        .verify();
}

相关文章

Mono类方法