本文整理了Java中reactor.core.publisher.Mono.subscribe()
方法的一些代码示例,展示了Mono.subscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.subscribe()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:subscribe
[英]Subscribe to this Mono and request unbounded demand.
This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
[中]订阅此Mono并请求无限需求。
这个版本没有为链中的事件指定任何消费行为,尤其是没有错误处理,因此通常应该首选其他变体。
代码示例来源:origin: spring-projects/spring-framework
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.delegateSession = this.sessionFactory.apply(session);
this.delegateHandler.handle(this.delegateSession).subscribe(this.delegateSession);
}
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, true));
}
}
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new BulkheadSubscriber<>(bulkhead, actual));
}
}
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new RateLimiterSubscriber<>(rateLimiter, actual));
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void handleRequest(HttpServerExchange exchange) {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void onConnect(WebSocketHttpExchange httpExchange, WebSocketChannel channel) {
UndertowWebSocketSession session = createSession(channel);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
this.handler.handle(session).subscribe(session);
}
代码示例来源:origin: spring-projects/spring-framework
this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
MonoProcessor<ClientHttpResponse> result = MonoProcessor.create();
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
MockServerHttpResponse mockServerResponse = new MockServerHttpResponse();
mockClientRequest.setWriteHandler(requestBody -> {
log("Invoking HttpHandler for ", httpMethod, uri);
ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody);
ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
this.handler.handle(mockServerRequest, responseToUse).subscribe(aVoid -> {}, result::onError);
return Mono.empty();
});
mockServerResponse.setWriteHandler(responseBody ->
Mono.fromRunnable(() -> {
log("Creating client response for ", httpMethod, uri);
result.onNext(adaptResponse(mockServerResponse, responseBody));
}));
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError);
return result;
}
代码示例来源:origin: spring-projects/spring-framework
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
DefaultNegotiation negotiation, WebSocketChannel channel) {
HandshakeInfo info = createHandshakeInfo(url, negotiation);
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
handler.handle(session).subscribe(session);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void onOpen(Session session, EndpointConfig config) {
this.delegateSession = this.sessionFactory.apply(session);
Assert.state(this.delegateSession != null, "No delegate session");
session.addMessageHandler(String.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
session.addMessageHandler(ByteBuffer.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
session.addMessageHandler(PongMessage.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
this.delegateHandler.handle(this.delegateSession).subscribe(this.delegateSession);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create();
Flux.range(1, 10).collect(ArrayList<Integer>::new, (a, b) -> a.add(b)).subscribe(ts);
ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.assertNoError()
.assertComplete();
}
代码示例来源:origin: spring-projects/spring-framework
@Test // SPR-16402
public void singleSubscriberWithResource() throws IOException {
UnicastProcessor<Resource> processor = UnicastProcessor.create();
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
Mono.just(logo).subscribe(processor);
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.asyncPart("logo", processor, Resource.class);
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
Map<String, Object> hints = Collections.emptyMap();
this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, this.response, hints).block();
MultiValueMap<String, Part> requestParts = parse(hints);
assertEquals(1, requestParts.size());
Part part = requestParts.getFirst("logo");
assertEquals("logo", part.name());
assertTrue(part instanceof FilePart);
assertEquals("logo.jpg", ((FilePart) part).filename());
assertEquals(MediaType.IMAGE_JPEG, part.headers().getContentType());
assertEquals(logo.getFile().length(), part.headers().getContentLength());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void empty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>empty().defaultIfEmpty(10).subscribe(ts);
ts.assertValues(10)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noErrorHookThrowsCallbackNotImplemented() {
RuntimeException boom = new IllegalArgumentException("boom");
Assertions.assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> Mono.error(boom).subscribe(v -> {}))
.withCause(boom)
.hasToString("reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void neverTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.delaySubscription(Mono.never())
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void someMatch() {
AssertSubscriber<Boolean> ts = AssertSubscriber.create();
Flux.range(1, 10).all(v -> v < 6).subscribe(ts);
ts.assertValues(false)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Mono.<Integer>empty().defaultIfEmpty(10).subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(10)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void none() {
AssertSubscriber<Boolean> ts = AssertSubscriber.create();
Flux.range(1, 10).any(v -> false).subscribe(ts);
ts.assertValues(false)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
public void normalBackpressured() {
AssertSubscriber<Long> ts = AssertSubscriber.create(0);
Flux.range(1, 10).count().subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(10L)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.just(1).single().subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(1);
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!