本文整理了Java中reactor.core.publisher.Mono.publishOn()
方法的一些代码示例,展示了Mono.publishOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.publishOn()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:publishOn
[英]Run onNext, onComplete and onError on a supplied SchedulerWorker.
This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.
Typically used for fast publisher, slow consumer(s) scenarios.
mono.publishOn(Schedulers.single()).subscribe()
[中]在提供的SchedulerWorker上运行onNext、onComplete和onError。
此操作符影响线程上下文,在线程上下文中,它下面的链中的其他操作符将执行线程上下文,直到出现新的publishOn。
通常用于发布速度快、消费速度慢的场景。
mono.publishOn(Schedulers.single()).subscribe()
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new RateLimiterSubscriber<>(rateLimiter, actual));
}
}
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new BulkheadSubscriber<>(bulkhead, actual));
}
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Authentication> authenticate(Authentication token) {
return Mono.just(token)
.publishOn(Schedulers.elastic())
.flatMap( t -> {
try {
return Mono.just(authenticationManager.authenticate(t));
} catch(Throwable error) {
return Mono.error(error);
}
})
.filter( a -> a.isAuthenticated());
}
}
代码示例来源:origin: org.springframework.security/spring-security-core
@Override
public Mono<Authentication> authenticate(Authentication token) {
return Mono.just(token)
.publishOn(Schedulers.elastic())
.flatMap( t -> {
try {
return Mono.just(authenticationManager.authenticate(t));
} catch(Throwable error) {
return Mono.error(error);
}
})
.filter( a -> a.isAuthenticated());
}
}
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
final String username = authentication.getName();
final String presentedPassword = (String) authentication.getCredentials();
return this.userDetailsService.findByUsername(username)
.publishOn(this.scheduler)
.filter(u -> this.passwordEncoder.matches(presentedPassword, u.getPassword()))
.switchIfEmpty(Mono.defer(() -> Mono.error(new BadCredentialsException("Invalid Credentials"))))
.flatMap(u -> {
boolean upgradeEncoding = this.userDetailsPasswordService != null
&& this.passwordEncoder.upgradeEncoding(u.getPassword());
if (upgradeEncoding) {
String newPassword = this.passwordEncoder.encode(presentedPassword);
return this.userDetailsPasswordService.updatePassword(u, newPassword);
}
return Mono.just(u);
})
.doOnNext(this.postAuthenticationChecks::check)
.map(u -> new UsernamePasswordAuthenticationToken(u, u.getPassword(), u.getAuthorities()) );
}
代码示例来源:origin: org.springframework.security/spring-security-core
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
final String username = authentication.getName();
final String presentedPassword = (String) authentication.getCredentials();
return this.userDetailsService.findByUsername(username)
.publishOn(this.scheduler)
.filter(u -> this.passwordEncoder.matches(presentedPassword, u.getPassword()))
.switchIfEmpty(Mono.defer(() -> Mono.error(new BadCredentialsException("Invalid Credentials"))))
.flatMap(u -> {
boolean upgradeEncoding = this.userDetailsPasswordService != null
&& this.passwordEncoder.upgradeEncoding(u.getPassword());
if (upgradeEncoding) {
String newPassword = this.passwordEncoder.encode(presentedPassword);
return this.userDetailsPasswordService.updatePassword(u, newPassword);
}
return Mono.just(u);
})
.map(u -> new UsernamePasswordAuthenticationToken(u, u.getPassword(), u.getAuthorities()) );
}
代码示例来源:origin: reactor/reactor-core
.publishOn(fromExecutor(executor))
.block();
Assert.fail("Bubbling RejectedExecutionException expected");
代码示例来源:origin: reactor/reactor-core
@Test
public void monoBlockForbidden() {
Function<String, String> badMapper = v -> Mono.just(v).hide()
.block();
Function<String, String> badMapperTimeout = v -> Mono.just(v).hide()
.block(Duration.ofMillis(100));
Mono<String> forbiddenSequence1 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapper);
StepVerifier.create(forbiddenSequence1)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
Mono<String> forbiddenSequence2 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapperTimeout);
StepVerifier.create(forbiddenSequence2)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
StepVerifier.create(Mono.error(new RuntimeException("forced failure"))
.publishOn(Schedulers.single()))
.verifyErrorMessage("forced failure");
}
代码示例来源:origin: spring-projects/spring-security
private Mono<OAuth2AuthorizedClient> authorizeWithRefreshToken(ClientRequest request, ExchangeFunction next,
OAuth2AuthorizedClient authorizedClient) {
ClientRegistration clientRegistration = authorizedClient
.getClientRegistration();
String tokenUri = clientRegistration
.getProviderDetails().getTokenUri();
ClientRequest refreshRequest = ClientRequest.create(HttpMethod.POST, URI.create(tokenUri))
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.headers(headers -> headers.setBasicAuth(clientRegistration.getClientId(), clientRegistration.getClientSecret()))
.body(refreshTokenBody(authorizedClient.getRefreshToken().getTokenValue()))
.build();
return next.exchange(refreshRequest)
.flatMap(response -> response.body(oauth2AccessTokenResponse()))
.map(accessTokenResponse -> new OAuth2AuthorizedClient(authorizedClient.getClientRegistration(), authorizedClient.getPrincipalName(), accessTokenResponse.getAccessToken(), accessTokenResponse.getRefreshToken()))
.map(result -> {
Authentication principal = (Authentication) request.attribute(
AUTHENTICATION_ATTR_NAME).orElse(new PrincipalNameAuthentication(authorizedClient.getPrincipalName()));
HttpServletRequest httpRequest = (HttpServletRequest) request.attributes().get(
HTTP_SERVLET_REQUEST_ATTR_NAME);
HttpServletResponse httpResponse = (HttpServletResponse) request.attributes().get(
HTTP_SERVLET_RESPONSE_ATTR_NAME);
this.authorizedClientRepository.saveAuthorizedClient(result, principal, httpRequest, httpResponse);
return result;
})
.publishOn(Schedulers.elastic());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoBlockOptionalForbidden() {
Function<String, Optional<String>> badMapper = v -> Mono.just(v).hide()
.blockOptional();
Function<String, Optional<String>> badMapperTimeout = v -> Mono.just(v).hide()
.blockOptional(Duration.ofMillis(100));
Mono<Optional<String>> forbiddenSequence1 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapper);
StepVerifier.create(forbiddenSequence1)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("blockOptional() is blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
Mono<Optional<String>> forbiddenSequence2 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapperTimeout);
StepVerifier.create(forbiddenSequence2)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("blockOptional() is blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorHide() {
StepVerifier.create(Mono.error(new RuntimeException("forced failure"))
.hide()
.publishOn(Schedulers.single()))
.verifyErrorMessage("forced failure");
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createStreamFromMonoCreate2() {
StepVerifier.create(Mono.create(MonoSink::success)
.publishOn(Schedulers.parallel()))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorCallbackErrorWithParallel() {
AssertSubscriber<Integer> assertSubscriber = new AssertSubscriber<>();
Mono.just(1)
.publishOn(parallel())
.doOnNext(i -> {
throw new IllegalArgumentException();
})
.doOnError(e -> {
throw new IllegalStateException(e);
})
.subscribe(assertSubscriber);
assertSubscriber
.await()
.assertError(IllegalStateException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxBlockLastForbidden() {
Function<String, String> badMapper = v -> Flux.just(v).hide()
.blockLast();
Function<String, String> badMapperTimeout = v -> Flux.just(v).hide()
.blockLast(Duration.ofMillis(100));
Mono<String> forbiddenSequence1 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapper);
StepVerifier.create(forbiddenSequence1)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
Mono<String> forbiddenSequence2 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapperTimeout);
StepVerifier.create(forbiddenSequence2)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxBlockFirstForbidden() {
Function<String, String> badMapper = v -> Flux.just(v).hide()
.blockFirst();
Function<String, String> badMapperTimeout = v -> Flux.just(v).hide()
.blockFirst(Duration.ofMillis(100));
Mono<String> forbiddenSequence1 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapper);
StepVerifier.create(forbiddenSequence1)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
Mono<String> forbiddenSequence2 = Mono.just("data")
.publishOn(nonBlockingScheduler)
.map(badMapperTimeout);
StepVerifier.create(forbiddenSequence2)
.expectErrorSatisfies(e -> assertThat(e)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void callableEvaluatedTheRightTime() {
AtomicInteger count = new AtomicInteger();
Mono<Integer> p = Mono.fromCallable(count::incrementAndGet)
.publishOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
Assert.assertEquals(0, count.get());
AssertSubscriber<Integer> ts = AssertSubscriber.create();
p.subscribe(ts);
if (!ts.await(Duration.ofSeconds(5))
.isTerminated()) {
ts.cancel();
Assert.fail("AssertSubscriber timed out");
}
Assert.assertEquals(1, count.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorCallbackErrorWithParallel() {
AssertSubscriber<Integer> assertSubscriber = new AssertSubscriber<>();
Mono.just(1)
.hide()
.publishOn(parallel())
.doOnNext(i -> {
throw new IllegalArgumentException();
})
.doOnError(e -> {
throw new IllegalStateException(e);
})
.subscribe(assertSubscriber);
assertSubscriber.await()
.assertError(IllegalStateException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorModeContinueInternalErrorMonoAsync() {
Flux<Integer> test = Flux
.just(0, 1)
.hide()
.concatMap(f -> Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1/i))
.onErrorContinue(OnNextFailureStrategyTest::drop);
StepVerifier.create(test)
.expectNoFusionSupport()
.expectNext(1)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(0)
.hasDroppedErrors(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorModeContinueInternalErrorMonoAsync() {
Flux<Integer> test = Flux
.just(0, 1)
.hide()
.flatMap(f -> Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1/i))
.onErrorContinue(OnNextFailureStrategyTest::drop);
StepVerifier.create(test)
.expectNoFusionSupport()
.expectNext(1)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(0)
.hasDroppedErrors(1);
}
}
内容来源于网络,如有侵权,请联系作者删除!