本文整理了Java中reactor.core.publisher.Mono.transform()
方法的一些代码示例,展示了Mono.transform()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.transform()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:transform
[英]Transform this Mono in order to generate a target Mono. Unlike #compose(Function), the provided function is executed as part of assembly.
Function applySchedulers = mono -> mono.subscribeOn(Schedulers.io())
.publishOn(Schedulers.parallel());
mono.transform(applySchedulers).map(v -> v * v).subscribe();
[中]变换此单声道以生成目标单声道。与#compose(函数)不同,提供的函数作为程序集的一部分执行
Function applySchedulers = mono -> mono.subscribeOn(Schedulers.io())
.publishOn(Schedulers.parallel());
mono.transform(applySchedulers).map(v -> v * v).subscribe();
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Mono.just("Event 1")
.transform(BulkheadOperator.of(bulkhead))
.transform(RateLimiterOperator.of(rateLimiter))
.transform(CircuitBreakerOperator.of(circuitBreaker))
).expectNext("Event 1")
.verifyComplete();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldRecordSuccessWhenUsingToFuture() {
try {
Mono.just("Event")
.transform(CircuitBreakerOperator.of(circuitBreaker))
.toFuture()
.get();
assertSingleSuccessfulCall();
} catch (InterruptedException | ExecutionException e) {
fail();
}
}
}
代码示例来源:origin: resilience4j/resilience4j
if (result != null) {
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter, Schedulers.immediate());
result = recoveryFunction.onErrorResume(result.transform(operator));
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Mono.just("Event")
.transform(BulkheadOperator.of(bulkhead)))
.expectNext("Event")
.verifyComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmptyMonoShouldBeSuccessful() {
StepVerifier.create(
Mono.empty()
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.verifyComplete();
assertSingleSuccessfulCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(BulkheadOperator.of(bulkhead)))
.expectSubscription()
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithBulkheadFullException() {
bulkhead.isCallPermitted();
StepVerifier.create(
Mono.just("Event")
.transform(BulkheadOperator.of(bulkhead)))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void transform() {
StepVerifier.create(Mono.just(1).transform(m -> Flux.just(1, 2, 3)))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertSingleFailedCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Mono.just("Event")
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
assertNoRegisteredCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Mono.just("Event")
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectNext("Event")
.verifyComplete();
assertSingleSuccessfulCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
bulkhead.isCallPermitted();
StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
circuitBreaker.transitionToForcedOpenState();
StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
assertNoRegisteredCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
circuitBreaker.transitionToForcedOpenState();
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
assertNoRegisteredCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() {
bulkhead.isCallPermitted();
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Mono.just("Event")
.transform(RateLimiterOperator.of(rateLimiter)))
.expectNext("Event")
.verifyComplete();
assertSinglePermitUsed();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertSinglePermitUsed();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithBulkheadFullException() {
saturateRateLimiter();
StepVerifier.create(
Mono.just("Event")
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
assertNoPermitLeft();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorNotOnSubscribe() {
saturateRateLimiter();
StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
assertNoPermitLeft();
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe() {
saturateRateLimiter();
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
assertNoPermitLeft();
}
内容来源于网络,如有侵权,请联系作者删除!