reactor.core.publisher.Mono.publishOn()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(907)

本文整理了Java中reactor.core.publisher.Mono.publishOn()方法的一些代码示例,展示了Mono.publishOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.publishOn()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:publishOn

Mono.publishOn介绍

[英]Run onNext, onComplete and onError on a supplied SchedulerWorker.

This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.

Typically used for fast publisher, slow consumer(s) scenarios.

mono.publishOn(Schedulers.single()).subscribe()

[中]在提供的SchedulerWorker上运行onNext、onComplete和onError。
此操作符影响线程上下文,在线程上下文中,它下面的链中的其他操作符将执行线程上下文,直到出现新的publishOn。
通常用于发布速度快、消费速度慢的场景。

mono.publishOn(Schedulers.single()).subscribe()

代码示例

代码示例来源:origin: resilience4j/resilience4j

@Override
  public void subscribe(CoreSubscriber<? super T> actual) {
    source.publishOn(scheduler)
        .subscribe(new RateLimiterSubscriber<>(rateLimiter, actual));
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Override
  public void subscribe(CoreSubscriber<? super T> actual) {
    source.publishOn(scheduler)
        .subscribe(new BulkheadSubscriber<>(bulkhead, actual));
  }
}

代码示例来源:origin: spring-projects/spring-security

@Override
  public Mono<Authentication> authenticate(Authentication token) {
    return Mono.just(token)
      .publishOn(Schedulers.elastic())
      .flatMap( t -> {
        try {
          return Mono.just(authenticationManager.authenticate(t));
        } catch(Throwable error) {
          return Mono.error(error);
        }
      })
      .filter( a -> a.isAuthenticated());
  }
}

代码示例来源:origin: org.springframework.security/spring-security-core

@Override
  public Mono<Authentication> authenticate(Authentication token) {
    return Mono.just(token)
      .publishOn(Schedulers.elastic())
      .flatMap( t -> {
        try {
          return Mono.just(authenticationManager.authenticate(t));
        } catch(Throwable error) {
          return Mono.error(error);
        }
      })
      .filter( a -> a.isAuthenticated());
  }
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<Authentication> authenticate(Authentication authentication) {
  final String username = authentication.getName();
  final String presentedPassword = (String) authentication.getCredentials();
  return this.userDetailsService.findByUsername(username)
      .publishOn(this.scheduler)
      .filter(u -> this.passwordEncoder.matches(presentedPassword, u.getPassword()))
      .switchIfEmpty(Mono.defer(() -> Mono.error(new BadCredentialsException("Invalid Credentials"))))
      .flatMap(u -> {
        boolean upgradeEncoding = this.userDetailsPasswordService != null
            && this.passwordEncoder.upgradeEncoding(u.getPassword());
        if (upgradeEncoding) {
          String newPassword = this.passwordEncoder.encode(presentedPassword);
          return this.userDetailsPasswordService.updatePassword(u, newPassword);
        }
        return Mono.just(u);
      })
      .doOnNext(this.postAuthenticationChecks::check)
      .map(u -> new UsernamePasswordAuthenticationToken(u, u.getPassword(), u.getAuthorities()) );
}

代码示例来源:origin: org.springframework.security/spring-security-core

@Override
public Mono<Authentication> authenticate(Authentication authentication) {
  final String username = authentication.getName();
  final String presentedPassword = (String) authentication.getCredentials();
  return this.userDetailsService.findByUsername(username)
      .publishOn(this.scheduler)
      .filter(u -> this.passwordEncoder.matches(presentedPassword, u.getPassword()))
      .switchIfEmpty(Mono.defer(() -> Mono.error(new BadCredentialsException("Invalid Credentials"))))
      .flatMap(u -> {
        boolean upgradeEncoding = this.userDetailsPasswordService != null
            && this.passwordEncoder.upgradeEncoding(u.getPassword());
        if (upgradeEncoding) {
          String newPassword = this.passwordEncoder.encode(presentedPassword);
          return this.userDetailsPasswordService.updatePassword(u, newPassword);
        }
        return Mono.just(u);
      })
      .map(u -> new UsernamePasswordAuthenticationToken(u, u.getPassword(), u.getAuthorities()) );
}

代码示例来源:origin: reactor/reactor-core

.publishOn(fromExecutor(executor))
  .block();
Assert.fail("Bubbling RejectedExecutionException expected");

代码示例来源:origin: reactor/reactor-core

@Test
public void monoBlockForbidden() {
  Function<String, String> badMapper = v -> Mono.just(v).hide()
                         .block();
  Function<String, String> badMapperTimeout = v -> Mono.just(v).hide()
                             .block(Duration.ofMillis(100));
  Mono<String> forbiddenSequence1 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapper);
  StepVerifier.create(forbiddenSequence1)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
  Mono<String> forbiddenSequence2 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapperTimeout);
  StepVerifier.create(forbiddenSequence2)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  StepVerifier.create(Mono.error(new RuntimeException("forced failure"))
              .publishOn(Schedulers.single()))
        .verifyErrorMessage("forced failure");
}

代码示例来源:origin: spring-projects/spring-security

private Mono<OAuth2AuthorizedClient> authorizeWithRefreshToken(ClientRequest request, ExchangeFunction next,
                                OAuth2AuthorizedClient authorizedClient) {
  ClientRegistration clientRegistration = authorizedClient
      .getClientRegistration();
  String tokenUri = clientRegistration
      .getProviderDetails().getTokenUri();
  ClientRequest refreshRequest = ClientRequest.create(HttpMethod.POST, URI.create(tokenUri))
      .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
      .headers(headers -> headers.setBasicAuth(clientRegistration.getClientId(), clientRegistration.getClientSecret()))
      .body(refreshTokenBody(authorizedClient.getRefreshToken().getTokenValue()))
      .build();
  return next.exchange(refreshRequest)
      .flatMap(response -> response.body(oauth2AccessTokenResponse()))
      .map(accessTokenResponse -> new OAuth2AuthorizedClient(authorizedClient.getClientRegistration(), authorizedClient.getPrincipalName(), accessTokenResponse.getAccessToken(), accessTokenResponse.getRefreshToken()))
      .map(result -> {
        Authentication principal = (Authentication) request.attribute(
            AUTHENTICATION_ATTR_NAME).orElse(new PrincipalNameAuthentication(authorizedClient.getPrincipalName()));
        HttpServletRequest httpRequest = (HttpServletRequest) request.attributes().get(
            HTTP_SERVLET_REQUEST_ATTR_NAME);
        HttpServletResponse httpResponse = (HttpServletResponse) request.attributes().get(
            HTTP_SERVLET_RESPONSE_ATTR_NAME);
        this.authorizedClientRepository.saveAuthorizedClient(result, principal, httpRequest, httpResponse);
        return result;
      })
      .publishOn(Schedulers.elastic());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoBlockOptionalForbidden() {
  Function<String, Optional<String>> badMapper = v -> Mono.just(v).hide()
                              .blockOptional();
  Function<String, Optional<String>> badMapperTimeout = v -> Mono.just(v).hide()
                                  .blockOptional(Duration.ofMillis(100));
  Mono<Optional<String>> forbiddenSequence1 = Mono.just("data")
                          .publishOn(nonBlockingScheduler)
                          .map(badMapper);
  StepVerifier.create(forbiddenSequence1)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("blockOptional() is blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
  Mono<Optional<String>> forbiddenSequence2 = Mono.just("data")
                          .publishOn(nonBlockingScheduler)
                          .map(badMapperTimeout);
  StepVerifier.create(forbiddenSequence2)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("blockOptional() is blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void errorHide() {
    StepVerifier.create(Mono.error(new RuntimeException("forced failure"))
                .hide()
                .publishOn(Schedulers.single()))
          .verifyErrorMessage("forced failure");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void createStreamFromMonoCreate2() {
  StepVerifier.create(Mono.create(MonoSink::success)
              .publishOn(Schedulers.parallel()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorCallbackErrorWithParallel() {
  AssertSubscriber<Integer> assertSubscriber = new AssertSubscriber<>();
  Mono.just(1)
    .publishOn(parallel())
    .doOnNext(i -> {
      throw new IllegalArgumentException();
    })
    .doOnError(e -> {
      throw new IllegalStateException(e);
    })
    .subscribe(assertSubscriber);
  assertSubscriber
      .await()
      .assertError(IllegalStateException.class)
      .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockLastForbidden() {
  Function<String, String> badMapper = v -> Flux.just(v).hide()
                         .blockLast();
  Function<String, String> badMapperTimeout = v -> Flux.just(v).hide()
                             .blockLast(Duration.ofMillis(100));
  Mono<String> forbiddenSequence1 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapper);
  StepVerifier.create(forbiddenSequence1)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
  Mono<String> forbiddenSequence2 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapperTimeout);
  StepVerifier.create(forbiddenSequence2)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockFirstForbidden() {
  Function<String, String> badMapper = v -> Flux.just(v).hide()
                         .blockFirst();
  Function<String, String> badMapperTimeout = v -> Flux.just(v).hide()
                             .blockFirst(Duration.ofMillis(100));
  Mono<String> forbiddenSequence1 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapper);
  StepVerifier.create(forbiddenSequence1)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
  Mono<String> forbiddenSequence2 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapperTimeout);
  StepVerifier.create(forbiddenSequence2)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableEvaluatedTheRightTime() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> p = Mono.fromCallable(count::incrementAndGet)
             .publishOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
  Assert.assertEquals(0, count.get());
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  p.subscribe(ts);
  if (!ts.await(Duration.ofSeconds(5))
      .isTerminated()) {
    ts.cancel();
    Assert.fail("AssertSubscriber timed out");
  }
  Assert.assertEquals(1, count.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorCallbackErrorWithParallel() {
  AssertSubscriber<Integer> assertSubscriber = new AssertSubscriber<>();
  Mono.just(1)
    .hide()
    .publishOn(parallel())
    .doOnNext(i -> {
      throw new IllegalArgumentException();
    })
    .doOnError(e -> {
      throw new IllegalStateException(e);
    })
    .subscribe(assertSubscriber);
  assertSubscriber.await()
          .assertError(IllegalStateException.class)
          .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorMonoAsync() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1/i))
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(0)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void errorModeContinueInternalErrorMonoAsync() {
    Flux<Integer> test = Flux
        .just(0, 1)
        .hide()
        .flatMap(f ->  Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1/i))
        .onErrorContinue(OnNextFailureStrategyTest::drop);

    StepVerifier.create(test)
        .expectNoFusionSupport()
        .expectNext(1)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDropped(0)
        .hasDroppedErrors(1);
  }
}

相关文章

Mono类方法