本文整理了Java中reactor.core.publisher.Mono.thenMany()
方法的一些代码示例,展示了Mono.thenMany()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.thenMany()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:thenMany
[英]Let this Mono complete then play another Publisher.
In other words ignore element from this mono and transform the completion signal into a Flux that will emit elements from the provided Publisher.
[中]让这个单声道完成,然后播放另一个出版商。
换句话说,忽略来自这个mono的元素,并将完成信号转换成一个通量,该通量将从提供的发布者发出元素。
代码示例来源:origin: spring-projects/spring-framework
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse ?
() -> consumeAndCancel(message).thenMany(Mono.empty()) : Flux::empty;
}
代码示例来源:origin: spring-projects/spring-framework
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
Flux<T> result;
if (message.getHeaders().getContentType() == null) {
// Maybe it's okay there is no content type, if there is no content..
result = message.getBody().map(buffer -> {
DataBufferUtils.release(buffer);
throw ex;
});
}
else {
result = message instanceof ClientHttpResponse ?
consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
}
return result;
}
代码示例来源:origin: spring-projects/spring-framework
Flux<DataBuffer> partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody));
代码示例来源:origin: spring-projects/spring-framework
@Test
public void echo() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
this.client.execute(getUrl("/echo"), session -> session
.send(input.map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.then())
.block(TIMEOUT);
assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}
代码示例来源:origin: org.springframework/spring-web
Flux<DataBuffer> partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody));
代码示例来源:origin: reactor/reactor-core
@Test
public void windowWithTimeoutStartsTimerOnSubscription() {
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(300))
.thenMany(Flux.range(1, 3))
.delayElements(Duration.ofMillis(150))
.concatWith(Flux.range(4, 10).delaySubscription(Duration.ofMillis(500)))
.windowTimeout(10, Duration.ofMillis(500))
.flatMap(Flux::collectList)
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(100))
.assertNext(l -> assertThat(l).containsExactly(1))
.assertNext(l -> assertThat(l).containsExactly(2, 3))
.assertNext(l -> assertThat(l).containsExactly(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
.assertNext(l -> assertThat(l).isEmpty())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManySameType() {
Flux<String> test = Mono.just("A")
.thenMany(Flux.just("C", "D"));
AssertSubscriber<String> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues("C", "D");
ts.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManyDifferentType() {
Flux<String> test = Mono.just(1)
.thenMany(Flux.just("C", "D"));
AssertSubscriber<String> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues("C", "D");
ts.assertComplete();
}
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Test
public void echo() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(getUrl("/echo"),
session -> {
logger.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doOnSuccessOrError((aVoid, ex) ->
logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
assertEquals(input.collectList().block(Duration.ofMillis(5000)),
output.collectList().block(Duration.ofMillis(5000)));
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Test
public void echoForHttp() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(getHttpUrl("/echoForHttp"),
session -> {
logger.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doOnSuccessOrError((aVoid, ex) ->
logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
assertEquals(input.collectList().block(Duration.ofMillis(5000)),
output.collectList().block(Duration.ofMillis(5000)));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testMonoThenManySupplier() {
AssertSubscriber<String> ts = AssertSubscriber.create();
Flux<String> test = Mono.just(1).thenMany(Flux.defer(() -> Flux.just("A", "B")));
test.subscribe(ts);
ts.assertValues("A", "B");
ts.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManyFusion() {
Flux<Integer> test = Mono.just("A")
.thenMany(Flux.just("C", "D"))
.thenMany(Flux.just(1, 2));
Assert.assertTrue(test instanceof FluxConcatArray);
FluxConcatArray<Integer> s = (FluxConcatArray<Integer>) test;
Assert.assertTrue(s.array.length == 3);
AssertSubscriber<Integer> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues(1, 2);
ts.assertComplete();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param other
* @return
* @see reactor.core.publisher.Mono#thenMany(org.reactivestreams.Publisher)
*/
public final <V> Flux<V> thenMany(Publisher<V> other) {
return boxed.thenMany(other);
}
/**
代码示例来源:origin: rsocket/rsocket-java
private Flux<Payload> handleChannel(Flux<Payload> request) {
return lifecycle
.active()
.thenMany(
Flux.defer(
() -> {
代码示例来源:origin: rsocket/rsocket-java
private Flux<Payload> handleRequestStream(final Payload payload) {
return lifecycle
.active()
.thenMany(
Flux.defer(
() -> {
代码示例来源:origin: io.netifi.proteus/client
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return _notify()
.thenMany(source.requestChannel(payloads));
}
代码示例来源:origin: LuoLiangDSGA/spring-learning
@PostConstruct
public void loadData() {
factory.getReactiveConnection().serverCommands().flushAll()
.thenMany(Flux.just("Thor", "Hulk", "Tony")
.map(name -> new User(UUID.randomUUID().toString().substring(0, 5), name, "123456"))
.flatMap(user -> redisOperations.opsForValue().set(user.getId(), user))
).thenMany(redisOperations.keys("*")
.flatMap(redisOperations.opsForValue()::get))
.subscribe(System.out::println);
}
}
代码示例来源:origin: hantsy/spring-reactive-sample
private void initPosts() {
this.posts.deleteAll()
.thenMany(
Flux.just("Post one", "Post two")
.flatMap(title -> this.posts.save(Post.builder().id(UUID.randomUUID().toString()).title(title).content("content of " + title).build()))
);
//.subscribe(null, null, () -> log.info("done posts initialization..."));
}
代码示例来源:origin: hantsy/spring-reactive-sample
private void initPosts() {
this.posts.deleteAll()
.thenMany(
Flux.just("Post one", "Post two")
.flatMap(title -> this.posts.save(Post.builder().id(UUID.randomUUID().toString()).title(title).content("content of " + title).build()))
);
//.subscribe(null, null, () -> log.info("done posts initialization..."));
}
代码示例来源:origin: reactor/reactor-kafka
private Mono<Void> transaction(Publisher<? extends ProducerRecord<K, V>> transactionRecords) {
return transactionManager()
.begin()
.thenMany(sendProducerRecords(transactionRecords))
.concatWith(transactionManager().commit())
.onErrorResume(e -> transactionManager().abort().then(Mono.error(e)))
.publishOn(senderOptions.scheduler())
.then();
}
内容来源于网络,如有侵权,请联系作者删除!