reactor.core.publisher.Mono.transform()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(918)

本文整理了Java中reactor.core.publisher.Mono.transform()方法的一些代码示例,展示了Mono.transform()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.transform()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:transform

Mono.transform介绍

[英]Transform this Mono in order to generate a target Mono. Unlike #compose(Function), the provided function is executed as part of assembly.

Function applySchedulers = mono -> mono.subscribeOn(Schedulers.io()) 
.publishOn(Schedulers.parallel()); 
mono.transform(applySchedulers).map(v -> v * v).subscribe();

[中]变换此单声道以生成目标单声道。与#compose(函数)不同,提供的函数作为程序集的一部分执行

Function applySchedulers = mono -> mono.subscribeOn(Schedulers.io()) 
.publishOn(Schedulers.parallel()); 
mono.transform(applySchedulers).map(v -> v * v).subscribe();

代码示例

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Mono.just("Event 1")
          .transform(BulkheadOperator.of(bulkhead))
          .transform(RateLimiterOperator.of(rateLimiter))
          .transform(CircuitBreakerOperator.of(circuitBreaker))
  ).expectNext("Event 1")
      .verifyComplete();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldRecordSuccessWhenUsingToFuture() {
    try {
      Mono.just("Event")
          .transform(CircuitBreakerOperator.of(circuitBreaker))
          .toFuture()
          .get();

      assertSingleSuccessfulCall();
    } catch (InterruptedException | ExecutionException e) {
      fail();
    }

  }
}

代码示例来源:origin: resilience4j/resilience4j

if (result != null) {
  RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter, Schedulers.immediate());
  result = recoveryFunction.onErrorResume(result.transform(operator));

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Mono.just("Event")
          .transform(BulkheadOperator.of(bulkhead)))
      .expectNext("Event")
      .verifyComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmptyMonoShouldBeSuccessful() {
  StepVerifier.create(
      Mono.empty()
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .verifyComplete();
  assertSingleSuccessfulCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Mono.error(new IOException("BAM!"))
          .transform(BulkheadOperator.of(bulkhead)))
      .expectSubscription()
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithBulkheadFullException() {
  bulkhead.isCallPermitted();
  StepVerifier.create(
      Mono.just("Event")
          .transform(BulkheadOperator.of(bulkhead)))
      .expectSubscription()
      .expectError(BulkheadFullException.class)
      .verify(Duration.ofSeconds(1));
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void transform() {
  StepVerifier.create(Mono.just(1).transform(m -> Flux.just(1, 2, 3)))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Mono.error(new IOException("BAM!"))
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertSingleFailedCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
  circuitBreaker.transitionToOpenState();
  StepVerifier.create(
      Mono.just("Event")
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
  assertNoRegisteredCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Mono.just("Event")
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectNext("Event")
      .verifyComplete();
  assertSingleSuccessfulCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
    bulkhead.isCallPermitted();

    StepVerifier.create(
        Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
            .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
        .expectSubscription()
        .expectError(BulkheadFullException.class)
        .verify(Duration.ofSeconds(1));

    assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
  circuitBreaker.transitionToForcedOpenState();
  StepVerifier.create(
      Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
  assertNoRegisteredCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
  circuitBreaker.transitionToForcedOpenState();
  StepVerifier.create(
      Mono.error(new IOException("BAM!"))
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
  assertNoRegisteredCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() {
  bulkhead.isCallPermitted();
  StepVerifier.create(
      Mono.error(new IOException("BAM!"))
          .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
      .expectSubscription()
      .expectError(BulkheadFullException.class)
      .verify(Duration.ofSeconds(1));
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Mono.just("Event")
          .transform(RateLimiterOperator.of(rateLimiter)))
      .expectNext("Event")
      .verifyComplete();
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Mono.error(new IOException("BAM!"))
          .transform(RateLimiterOperator.of(rateLimiter)))
      .expectSubscription()
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithBulkheadFullException() {
  saturateRateLimiter();
  StepVerifier.create(
      Mono.just("Event")
          .transform(RateLimiterOperator.of(rateLimiter)))
      .expectSubscription()
      .expectError(RequestNotPermitted.class)
      .verify(Duration.ofSeconds(1));
  assertNoPermitLeft();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorNotOnSubscribe() {
    saturateRateLimiter();
    StepVerifier.create(
        Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
            .transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
        .expectError(RequestNotPermitted.class)
        .verify(Duration.ofSeconds(1));

    assertNoPermitLeft();
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe() {
  saturateRateLimiter();
  StepVerifier.create(
      Mono.error(new IOException("BAM!"))
          .transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
      .expectError(RequestNotPermitted.class)
      .verify(Duration.ofSeconds(1));
  assertNoPermitLeft();
}

相关文章

Mono类方法