reactor.core.publisher.Mono.as()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(686)

本文整理了Java中reactor.core.publisher.Mono.as()方法的一些代码示例,展示了Mono.as()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.as()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:as

Mono.as介绍

[英]Transform this Mono into a target type.

mono.as(Flux::from).subscribe()

[中]将此单声道转换为目标类型

mono.as(Flux::from).subscribe()

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Test
public void errorBeforeFirstItem() throws Exception {
  IllegalStateException error = new IllegalStateException("boo");
  Mono<Void> completion = Mono.<String>error(error).as(this::sendOperator);
  Signal<Void> signal = completion.materialize().block();
  assertNotNull(signal);
  assertSame("Unexpected signal: " + signal, error, signal.getThrowable());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoDirectPlainFuseable() {
  StepVerifier.create(Mono.just(1).as(TestPubFuseable::new))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoDirectIdentity() {
  StepVerifier.create(Mono.just(1).as(Mono::fromDirect))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void asJust() {
  StepVerifier.create(Mono.just(1).as(Flux::from))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoDirectPlain() {
  StepVerifier.create(Mono.just(1).as(TestPub::new))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void longEmptyEmitsEmptyWindowsRegularly() {
  StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofMillis(350))
                      .ignoreElement()
                      .as(Flux::from)
                      .windowTimeout(1000, Duration.ofMillis(100))
                      .concatMap(Flux::collectList)
  )
        .thenAwait(Duration.ofMinutes(1))
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextFromFirstSubscriberCached() {
  AtomicInteger contextFillCount = new AtomicInteger();
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Mono<Context> cached = Mono.subscriberContext()
                .as(m -> new MonoCacheTime<>(m, Duration.ofMillis(500), vts))
                .subscriberContext(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));
  //at first pass, the context is captured
  String cacheMiss = cached.map(x -> x.getOrDefault("a", "BAD")).block();
  assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1");
  assertThat(contextFillCount).as("cacheMiss").hasValue(1);
  //at second subscribe, the Context fill attempt is still done, but ultimately ignored since Mono.subscriberContext() result is cached
  String cacheHit = cached.map(x -> x.getOrDefault("a", "BAD")).block();
  assertThat(cacheHit).as("cacheHit").isEqualTo("GOOD1"); //value from the cache
  assertThat(contextFillCount).as("cacheHit").hasValue(2); //function was still invoked
  vts.advanceTimeBy(Duration.ofMillis(501));
  //at third subscribe, after the expiration delay, function is called for the 3rd time, but this time the resulting context is cached
  String cacheExpired = cached.map(x -> x.getOrDefault("a", "BAD")).block();
  assertThat(cacheExpired).as("cacheExpired").isEqualTo("GOOD3");
  assertThat(contextFillCount).as("cacheExpired").hasValue(3);
  //at fourth subscribe, function is called but ignored, the cached context is visible
  String cachePostExpired = cached.map(x -> x.getOrDefault("a", "BAD")).block();
  assertThat(cachePostExpired).as("cachePostExpired").isEqualTo("GOOD3");
  assertThat(contextFillCount).as("cachePostExpired").hasValue(4);
  vts.dispose();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resourceSupplierCanAccessContext() {
  Mono.usingWhen(Mono.subscriberContext()
            .map(ctx -> ctx.get(String.class)),
      Mono::just,
      Mono::just,
      Mono::just,
      Mono::just)
    .subscriberContext(Context.of(String.class, "contextual"))
    .as(StepVerifier::create)
    .expectNext("contextual")
    .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyFusionError() {
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> Mono.just("foo")
                 .hide()
                 .as(StepVerifier::create)
                 .expectFusion()
                 .expectNext("foo")
                 .expectComplete()
                 .verify())
      .withMessage("expectation failed (expected fuseable source but actual " +
          "Subscription is not: 3)");
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("makes 1 requestResponse request")
@Test
default void requestResponse1() {
 getClient()
   .requestResponse(createTestPayload(1))
   .map(Payload::getDataUtf8)
   .as(StepVerifier::create)
   .expectNextCount(1)
   .expectComplete()
   .verify(getTimeout());
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("creates instance")
@Test
void constructor() {
 channel.map(CloseableChannel::new).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("starts server")
@Test
void start() {
 InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
 TcpServerTransport serverTransport = TcpServerTransport.create(address);
 serverTransport
   .start(duplexConnection -> Mono.empty())
   .as(StepVerifier::create)
   .expectNextCount(1)
   .verifyComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("starts server")
@Test
void start() {
 InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
 WebsocketServerTransport serverTransport = WebsocketServerTransport.create(address);
 serverTransport
   .start(duplexConnection -> Mono.empty())
   .as(StepVerifier::create)
   .expectNextCount(1)
   .verifyComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("requestResponse gathers metrics")
@Test
void requestResponse() {
 Payload payload = DefaultPayload.create("test-metadata", "test-data");
 when(delegate.requestResponse(payload)).thenReturn(Mono.empty());
 new MicrometerRSocket(delegate, meterRegistry, Tag.of("test-key", "test-value"))
   .requestResponse(payload)
   .as(StepVerifier::create)
   .verifyComplete();
 assertThat(findTimer("request.response", SignalType.ON_COMPLETE).count()).isEqualTo(1);
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("returns the server by name")
@Test
void findServer() {
 LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
 serverTransport
   .start(duplexConnection -> Mono.empty())
   .as(StepVerifier::create)
   .expectNextCount(1)
   .verifyComplete();
 assertThat(LocalServerTransport.findServer(serverTransport.getName())).isNotNull();
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("connects to server")
@Test
void connect() {
 InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
 TcpServerTransport serverTransport = TcpServerTransport.create(address);
 serverTransport
   .start(duplexConnection -> Mono.empty())
   .flatMap(context -> TcpClientTransport.create(context.address()).connect())
   .as(StepVerifier::create)
   .expectNextCount(1)
   .verifyComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("metadataPush gathers metrics")
@Test
void metadataPush() {
 Payload payload = DefaultPayload.create("test-metadata", "test-data");
 when(delegate.metadataPush(payload)).thenReturn(Mono.empty());
 new MicrometerRSocket(delegate, meterRegistry, Tag.of("test-key", "test-value"))
   .metadataPush(payload)
   .as(StepVerifier::create)
   .verifyComplete();
 assertThat(findCounter("metadata.push", SignalType.ON_COMPLETE).count()).isEqualTo(1);
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("generates error if server not started")
@Test
void connectNoServer() {
 LocalClientTransport.create("test-name")
   .connect()
   .as(StepVerifier::create)
   .verifyErrorMessage("Could not find server: test-name");
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("connects to server")
@Test
void connect() {
 LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
 serverTransport
   .start(duplexConnection -> Mono.empty())
   .flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect())
   .as(StepVerifier::create)
   .expectNextCount(1)
   .verifyComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@DisplayName("create generates error if server not started")
@Test
void connectNoServer() {
 TcpClientTransport.create(8000).connect().as(StepVerifier::create).verifyError();
}

相关文章

Mono类方法