本文整理了Java中reactor.core.publisher.Operators.lift()
方法的一些代码示例,展示了Operators.lift()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.lift()
方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:lift
[英]Create a function that can be used to support a custom operator via CoreSubscriber decoration. The function is compatible with Flux#transform(Function), Mono#transform(Function), Hooks#onEachOperator(Function) and Hooks#onLastOperator(Function), but requires that the original Publisher be Scannable.
This variant attempts to expose the Publisher as a Scannable for convenience of introspection. You should however avoid instanceof checks or any other processing that depends on identity of the Publisher, as it might get hidden if Scannable#isScanAvailable() returns false. Use #liftPublisher(BiFunction) instead for that kind of use case.
[中]创建一个可用于通过CoreSubscriber装饰支持自定义运算符的函数。该函数与Flux#transform(函数)、Mono#transform(函数)、Hooks#onEachOperator(函数)和Hooks#OnlasOperator(函数)兼容,但要求原始发布者可扫描。
这种变体试图将出版商暴露为可扫描的,以便于内省。但是,您应该避免instanceof checks或任何其他依赖于发布者身份的处理,因为如果Scanable#isScanAvailable()返回false,它可能会被隐藏。对于这种用例,请使用#liftPublisher(双功能)。
代码示例来源:origin: reactor/reactor-core
Operators.lift((sc, sub) -> {
scannableRef.set(sc);
return sub;
代码示例来源:origin: reactor/reactor-core
@Test
public void eachOperatorTest() {
Hooks.onEachOperator(Operators.lift((sc, sub) ->
new CoreSubscriber<Object>(){
@Override
代码示例来源:origin: reactor/reactor-core
@Test
public void lastOperatorTest() {
Hooks.onLastOperator(Operators.lift((sc, sub) ->
new CoreSubscriber<Object>(){
@Override
代码示例来源:origin: reactor/reactor-core
Operators.lift(sc -> {
scannableFilterRef.set(sc);
return true;
代码示例来源:origin: reactor/reactor-core
@Test
public void lastOperatorTest() {
Hooks.onLastOperator(Operators.lift((sc, sub) ->
new CoreSubscriber<Object>(){
@Override
代码示例来源:origin: reactor/reactor-core
@Test
public void eachOperatorTest() {
Hooks.onEachOperator(Operators.lift((sc, sub) ->
new CoreSubscriber<Object>(){
@Override
代码示例来源:origin: reactor/reactor-core
@Test
public void lastOperatorFilterTest() {
Hooks.onLastOperator(Operators.lift(sc -> sc.tags()
.anyMatch(t -> t.getT1()
.contains("metric")),
代码示例来源:origin: reactor/reactor-core
@Test
public void lastOperatorFilterTest() {
Hooks.onLastOperator(Operators.lift(sc -> sc.tags()
.anyMatch(t -> t.getT1()
.contains("metric")),
代码示例来源:origin: org.mule.runtime/mule-core
private Function<? super Publisher<CoreEvent>, ? extends Publisher<CoreEvent>> doOnNextOrErrorWithContext(Consumer<Context> contextConsumer) {
return lift((scannable, subscriber) -> new CoreSubscriber<CoreEvent>() {
private Context context = subscriber.currentContext();
@Override
public void onNext(CoreEvent event) {
contextConsumer.accept(context);
subscriber.onNext(event);
}
@Override
public void onError(Throwable throwable) {
contextConsumer.accept(context);
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public Context currentContext() {
return context;
}
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}
});
}
代码示例来源:origin: io.rsocket.rpc/rsocket-rpc-core
public static <T>
Function<Map<String, String>, Function<? super Publisher<T>, ? extends Publisher<T>>> trace(
Tracer tracer, String name, Tag... tags) {
return map ->
Operators.lift(
(scannable, subscriber) ->
new SpanSubscriber<T>(
subscriber, subscriber.currentContext(), tracer, map, name, tags));
}
代码示例来源:origin: org.springframework.security/spring-security-test
@Override
public void beforeTestMethod(TestContext testContext) throws Exception {
SecurityContext securityContext = TestSecurityContextHolder.getContext();
Hooks.onLastOperator(CONTEXT_OPERATOR_KEY, Operators.lift((s, sub) -> new SecuritySubContext<>(sub, securityContext)));
}
代码示例来源:origin: io.rsocket.rpc/rsocket-rpc-core
public static <T>
Function<SpanContext, Function<? super Publisher<T>, ? extends Publisher<T>>> traceAsChild(
Tracer tracer, String name, Tag... tags) {
return (spanContext) -> {
if (spanContext == null) {
return Operators.lift(
(scannable, subscriber) ->
new SpanSubscriber<T>(
subscriber, subscriber.currentContext(), tracer, null, name, tags));
} else {
return Operators.lift(
(scannable, subscriber) ->
new SpanSubscriber<T>(
subscriber,
subscriber.currentContext(),
tracer,
null,
spanContext,
name,
tags));
}
};
}
}
代码示例来源:origin: apache/servicemix-bundles
@Override
public void beforeTestMethod(TestContext testContext) throws Exception {
SecurityContext securityContext = TestSecurityContextHolder.getContext();
Hooks.onLastOperator(CONTEXT_OPERATOR_KEY, Operators.lift((s, sub) -> new SecuritySubContext<>(sub, securityContext)));
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
@Override
public Publisher<TResponse> getReactiveConsumerFromPublisher(ReactiveStreamObserverPublisherClient<TResponse> publisher) {
return Flux.from(publisher).transform(Operators.lift(new BackpressureChunkingLifter<TResponse>()));
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
@Override
public Publisher<TResponse> getReactiveConsumerFromPublisher(ReactiveStreamObserverPublisherClient<TResponse> publisher) {
return Flux.from(publisher).transform(Operators.lift(new BackpressureChunkingLifter<TResponse>()));
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
/**
* Implements a unary -> stream call as {@link Mono} -> {@link Flux}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> Flux<TResponse> oneToMany(
Mono<TRequest> rxRequest,
BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
try {
ReactorConsumerStreamObserver<TRequest, TResponse> consumerStreamObserver = new ReactorConsumerStreamObserver<>();
rxRequest.subscribe(request -> delegate.accept(request, consumerStreamObserver));
return ((Flux<TResponse>) consumerStreamObserver.getRxConsumer())
.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
} catch (Throwable throwable) {
return Flux.error(throwable);
}
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
/**
* Implements a unary -> unary call using {@link Mono} -> {@link Mono}.
*/
public static <TRequest, TResponse> Mono<TResponse> oneToOne(
Mono<TRequest> rxRequest,
BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
try {
return Mono
.<TResponse>create(emitter -> rxRequest.subscribe(
request -> delegate.accept(request, new StreamObserver<TResponse>() {
@Override
public void onNext(TResponse tResponse) {
emitter.success(tResponse);
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
@Override
public void onCompleted() {
// Do nothing
}
}),
emitter::error
))
.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
} catch (Throwable throwable) {
return Mono.error(throwable);
}
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
/**
* Implements a bidirectional stream -> stream call as {@link Flux} -> {@link Flux}, where both the client
* and the server independently stream to each other.
*/
public static <TRequest, TResponse> Flux<TResponse> manyToMany(
Flux<TRequest> rxRequest,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
try {
ReactorProducerConsumerStreamObserver<TRequest, TResponse> consumerStreamObserver = new ReactorProducerConsumerStreamObserver<>(rxRequest);
delegate.apply(new CancellableStreamObserver<>(consumerStreamObserver, consumerStreamObserver::cancel));
consumerStreamObserver.rxSubscribe();
return ((Flux<TResponse>) consumerStreamObserver.getRxConsumer())
.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
} catch (Throwable throwable) {
return Flux.error(throwable);
}
}
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
/**
* Implements a stream -> unary call as {@link Flux} -> {@link Mono}, where the client transits a stream of
* messages.
*/
public static <TRequest, TResponse> Mono<TResponse> manyToOne(
Flux<TRequest> rxRequest,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
try {
return Mono
.<TResponse>create(emitter -> {
ReactiveProducerStreamObserver<TRequest, TResponse> reactiveProducerStreamObserver = new ReactiveProducerStreamObserver<>(
rxRequest,
emitter::success,
emitter::error,
Runnables.doNothing());
delegate.apply(
new CancellableStreamObserver<>(reactiveProducerStreamObserver,
reactiveProducerStreamObserver::cancel));
reactiveProducerStreamObserver.rxSubscribe();
})
.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
} catch (Throwable throwable) {
return Mono.error(throwable);
}
}
代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub
Mono<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(
Flux.from(streamObserverPublisher)
.transform(Operators.<TRequest, TRequest>lift(new BackpressureChunkingLifter<TRequest>()))));
rxResponse.subscribe(
value -> {
内容来源于网络,如有侵权,请联系作者删除!