本文整理了Java中reactor.core.publisher.Mono.as()
方法的一些代码示例,展示了Mono.as()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.as()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:as
[英]Transform this Mono into a target type.
mono.as(Flux::from).subscribe()
[中]将此单声道转换为目标类型
mono.as(Flux::from).subscribe()
代码示例来源:origin: spring-projects/spring-framework
@Test
public void errorBeforeFirstItem() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Mono<Void> completion = Mono.<String>error(error).as(this::sendOperator);
Signal<Void> signal = completion.materialize().block();
assertNotNull(signal);
assertSame("Unexpected signal: " + signal, error, signal.getThrowable());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoDirectPlainFuseable() {
StepVerifier.create(Mono.just(1).as(TestPubFuseable::new))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoDirectIdentity() {
StepVerifier.create(Mono.just(1).as(Mono::fromDirect))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void asJust() {
StepVerifier.create(Mono.just(1).as(Flux::from))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoDirectPlain() {
StepVerifier.create(Mono.just(1).as(TestPub::new))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void longEmptyEmitsEmptyWindowsRegularly() {
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofMillis(350))
.ignoreElement()
.as(Flux::from)
.windowTimeout(1000, Duration.ofMillis(100))
.concatMap(Flux::collectList)
)
.thenAwait(Duration.ofMinutes(1))
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextFromFirstSubscriberCached() {
AtomicInteger contextFillCount = new AtomicInteger();
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Mono<Context> cached = Mono.subscriberContext()
.as(m -> new MonoCacheTime<>(m, Duration.ofMillis(500), vts))
.subscriberContext(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));
//at first pass, the context is captured
String cacheMiss = cached.map(x -> x.getOrDefault("a", "BAD")).block();
assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1");
assertThat(contextFillCount).as("cacheMiss").hasValue(1);
//at second subscribe, the Context fill attempt is still done, but ultimately ignored since Mono.subscriberContext() result is cached
String cacheHit = cached.map(x -> x.getOrDefault("a", "BAD")).block();
assertThat(cacheHit).as("cacheHit").isEqualTo("GOOD1"); //value from the cache
assertThat(contextFillCount).as("cacheHit").hasValue(2); //function was still invoked
vts.advanceTimeBy(Duration.ofMillis(501));
//at third subscribe, after the expiration delay, function is called for the 3rd time, but this time the resulting context is cached
String cacheExpired = cached.map(x -> x.getOrDefault("a", "BAD")).block();
assertThat(cacheExpired).as("cacheExpired").isEqualTo("GOOD3");
assertThat(contextFillCount).as("cacheExpired").hasValue(3);
//at fourth subscribe, function is called but ignored, the cached context is visible
String cachePostExpired = cached.map(x -> x.getOrDefault("a", "BAD")).block();
assertThat(cachePostExpired).as("cachePostExpired").isEqualTo("GOOD3");
assertThat(contextFillCount).as("cachePostExpired").hasValue(4);
vts.dispose();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void resourceSupplierCanAccessContext() {
Mono.usingWhen(Mono.subscriberContext()
.map(ctx -> ctx.get(String.class)),
Mono::just,
Mono::just,
Mono::just,
Mono::just)
.subscriberContext(Context.of(String.class, "contextual"))
.as(StepVerifier::create)
.expectNext("contextual")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyFusionError() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> Mono.just("foo")
.hide()
.as(StepVerifier::create)
.expectFusion()
.expectNext("foo")
.expectComplete()
.verify())
.withMessage("expectation failed (expected fuseable source but actual " +
"Subscription is not: 3)");
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("makes 1 requestResponse request")
@Test
default void requestResponse1() {
getClient()
.requestResponse(createTestPayload(1))
.map(Payload::getDataUtf8)
.as(StepVerifier::create)
.expectNextCount(1)
.expectComplete()
.verify(getTimeout());
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("creates instance")
@Test
void constructor() {
channel.map(CloseableChannel::new).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("starts server")
@Test
void start() {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
TcpServerTransport serverTransport = TcpServerTransport.create(address);
serverTransport
.start(duplexConnection -> Mono.empty())
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("starts server")
@Test
void start() {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
WebsocketServerTransport serverTransport = WebsocketServerTransport.create(address);
serverTransport
.start(duplexConnection -> Mono.empty())
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("requestResponse gathers metrics")
@Test
void requestResponse() {
Payload payload = DefaultPayload.create("test-metadata", "test-data");
when(delegate.requestResponse(payload)).thenReturn(Mono.empty());
new MicrometerRSocket(delegate, meterRegistry, Tag.of("test-key", "test-value"))
.requestResponse(payload)
.as(StepVerifier::create)
.verifyComplete();
assertThat(findTimer("request.response", SignalType.ON_COMPLETE).count()).isEqualTo(1);
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("returns the server by name")
@Test
void findServer() {
LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
serverTransport
.start(duplexConnection -> Mono.empty())
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
assertThat(LocalServerTransport.findServer(serverTransport.getName())).isNotNull();
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("connects to server")
@Test
void connect() {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
TcpServerTransport serverTransport = TcpServerTransport.create(address);
serverTransport
.start(duplexConnection -> Mono.empty())
.flatMap(context -> TcpClientTransport.create(context.address()).connect())
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("metadataPush gathers metrics")
@Test
void metadataPush() {
Payload payload = DefaultPayload.create("test-metadata", "test-data");
when(delegate.metadataPush(payload)).thenReturn(Mono.empty());
new MicrometerRSocket(delegate, meterRegistry, Tag.of("test-key", "test-value"))
.metadataPush(payload)
.as(StepVerifier::create)
.verifyComplete();
assertThat(findCounter("metadata.push", SignalType.ON_COMPLETE).count()).isEqualTo(1);
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("generates error if server not started")
@Test
void connectNoServer() {
LocalClientTransport.create("test-name")
.connect()
.as(StepVerifier::create)
.verifyErrorMessage("Could not find server: test-name");
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("connects to server")
@Test
void connect() {
LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
serverTransport
.start(duplexConnection -> Mono.empty())
.flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect())
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
代码示例来源:origin: rsocket/rsocket-java
@DisplayName("create generates error if server not started")
@Test
void connectNoServer() {
TcpClientTransport.create(8000).connect().as(StepVerifier::create).verifyError();
}
内容来源于网络,如有侵权,请联系作者删除!