reactor.core.publisher.Operators.lift()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(284)

本文整理了Java中reactor.core.publisher.Operators.lift()方法的一些代码示例,展示了Operators.lift()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.lift()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:lift

Operators.lift介绍

[英]Create a function that can be used to support a custom operator via CoreSubscriber decoration. The function is compatible with Flux#transform(Function), Mono#transform(Function), Hooks#onEachOperator(Function) and Hooks#onLastOperator(Function), but requires that the original Publisher be Scannable.

This variant attempts to expose the Publisher as a Scannable for convenience of introspection. You should however avoid instanceof checks or any other processing that depends on identity of the Publisher, as it might get hidden if Scannable#isScanAvailable() returns false. Use #liftPublisher(BiFunction) instead for that kind of use case.
[中]创建一个可用于通过CoreSubscriber装饰支持自定义运算符的函数。该函数与Flux#transform(函数)、Mono#transform(函数)、Hooks#onEachOperator(函数)和Hooks#OnlasOperator(函数)兼容,但要求原始发布者可扫描。
这种变体试图将出版商暴露为可扫描的,以便于内省。但是,您应该避免instanceof checks或任何其他依赖于发布者身份的处理,因为如果Scanable#isScanAvailable()返回false,它可能会被隐藏。对于这种用例,请使用#liftPublisher(双功能)。

代码示例

代码示例来源:origin: reactor/reactor-core

Operators.lift((sc, sub) -> {
  scannableRef.set(sc);
  return sub;

代码示例来源:origin: reactor/reactor-core

@Test
public void eachOperatorTest() {
  Hooks.onEachOperator(Operators.lift((sc, sub) ->
      new CoreSubscriber<Object>(){
        @Override

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorTest() {
  Hooks.onLastOperator(Operators.lift((sc, sub) ->
      new CoreSubscriber<Object>(){
        @Override

代码示例来源:origin: reactor/reactor-core

Operators.lift(sc -> {
      scannableFilterRef.set(sc);
      return true;

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorTest() {
  Hooks.onLastOperator(Operators.lift((sc, sub) ->
      new CoreSubscriber<Object>(){
        @Override

代码示例来源:origin: reactor/reactor-core

@Test
public void eachOperatorTest() {
  Hooks.onEachOperator(Operators.lift((sc, sub) ->
      new CoreSubscriber<Object>(){
        @Override

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorFilterTest() {
  Hooks.onLastOperator(Operators.lift(sc -> sc.tags()
                        .anyMatch(t -> t.getT1()
                                .contains("metric")),

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorFilterTest() {
  Hooks.onLastOperator(Operators.lift(sc -> sc.tags()
                        .anyMatch(t -> t.getT1()
                                .contains("metric")),

代码示例来源:origin: org.mule.runtime/mule-core

private Function<? super Publisher<CoreEvent>, ? extends Publisher<CoreEvent>> doOnNextOrErrorWithContext(Consumer<Context> contextConsumer) {
 return lift((scannable, subscriber) -> new CoreSubscriber<CoreEvent>() {
  private Context context = subscriber.currentContext();
  @Override
  public void onNext(CoreEvent event) {
   contextConsumer.accept(context);
   subscriber.onNext(event);
  }
  @Override
  public void onError(Throwable throwable) {
   contextConsumer.accept(context);
   subscriber.onError(throwable);
  }
  @Override
  public void onComplete() {
   subscriber.onComplete();
  }
  @Override
  public Context currentContext() {
   return context;
  }
  @Override
  public void onSubscribe(Subscription s) {
   subscriber.onSubscribe(s);
  }
 });
}

代码示例来源:origin: io.rsocket.rpc/rsocket-rpc-core

public static <T>
  Function<Map<String, String>, Function<? super Publisher<T>, ? extends Publisher<T>>> trace(
    Tracer tracer, String name, Tag... tags) {
 return map ->
   Operators.lift(
     (scannable, subscriber) ->
       new SpanSubscriber<T>(
         subscriber, subscriber.currentContext(), tracer, map, name, tags));
}

代码示例来源:origin: org.springframework.security/spring-security-test

@Override
public void beforeTestMethod(TestContext testContext) throws Exception {
  SecurityContext securityContext = TestSecurityContextHolder.getContext();
  Hooks.onLastOperator(CONTEXT_OPERATOR_KEY, Operators.lift((s, sub) -> new SecuritySubContext<>(sub, securityContext)));
}

代码示例来源:origin: io.rsocket.rpc/rsocket-rpc-core

public static <T>
   Function<SpanContext, Function<? super Publisher<T>, ? extends Publisher<T>>> traceAsChild(
     Tracer tracer, String name, Tag... tags) {
  return (spanContext) -> {
   if (spanContext == null) {
    return Operators.lift(
      (scannable, subscriber) ->
        new SpanSubscriber<T>(
          subscriber, subscriber.currentContext(), tracer, null, name, tags));
   } else {
    return Operators.lift(
      (scannable, subscriber) ->
        new SpanSubscriber<T>(
          subscriber,
          subscriber.currentContext(),
          tracer,
          null,
          spanContext,
          name,
          tags));
   }
  };
 }
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public void beforeTestMethod(TestContext testContext) throws Exception {
  SecurityContext securityContext = TestSecurityContextHolder.getContext();
  Hooks.onLastOperator(CONTEXT_OPERATOR_KEY, Operators.lift((s, sub) -> new SecuritySubContext<>(sub, securityContext)));
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

@Override
public Publisher<TResponse> getReactiveConsumerFromPublisher(ReactiveStreamObserverPublisherClient<TResponse> publisher) {
  return Flux.from(publisher).transform(Operators.lift(new BackpressureChunkingLifter<TResponse>()));
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

@Override
public Publisher<TResponse> getReactiveConsumerFromPublisher(ReactiveStreamObserverPublisherClient<TResponse> publisher) {
  return Flux.from(publisher).transform(Operators.lift(new BackpressureChunkingLifter<TResponse>()));
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

/**
 * Implements a unary -> stream call as {@link Mono} -> {@link Flux}, where the server responds with a
 * stream of messages.
 */
public static <TRequest, TResponse> Flux<TResponse> oneToMany(
    Mono<TRequest> rxRequest,
    BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
  try {
    ReactorConsumerStreamObserver<TRequest, TResponse> consumerStreamObserver = new ReactorConsumerStreamObserver<>();
    rxRequest.subscribe(request -> delegate.accept(request, consumerStreamObserver));
    return ((Flux<TResponse>) consumerStreamObserver.getRxConsumer())
        .transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
  } catch (Throwable throwable) {
    return Flux.error(throwable);
  }
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

/**
 * Implements a unary -> unary call using {@link Mono} -> {@link Mono}.
 */
public static <TRequest, TResponse> Mono<TResponse> oneToOne(
    Mono<TRequest> rxRequest,
    BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
  try {
    return Mono
        .<TResponse>create(emitter -> rxRequest.subscribe(
          request -> delegate.accept(request, new StreamObserver<TResponse>() {
            @Override
            public void onNext(TResponse tResponse) {
              emitter.success(tResponse);
            }
            @Override
            public void onError(Throwable throwable) {
              emitter.error(throwable);
            }
            @Override
            public void onCompleted() {
              // Do nothing
            }
          }),
          emitter::error
        ))
        .transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
  } catch (Throwable throwable) {
    return Mono.error(throwable);
  }
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

/**
   * Implements a bidirectional stream -> stream call as {@link Flux} -> {@link Flux}, where both the client
   * and the server independently stream to each other.
   */
  public static <TRequest, TResponse> Flux<TResponse> manyToMany(
      Flux<TRequest> rxRequest,
      Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
    try {
      ReactorProducerConsumerStreamObserver<TRequest, TResponse> consumerStreamObserver = new ReactorProducerConsumerStreamObserver<>(rxRequest);
      delegate.apply(new CancellableStreamObserver<>(consumerStreamObserver, consumerStreamObserver::cancel));
      consumerStreamObserver.rxSubscribe();
      return ((Flux<TResponse>) consumerStreamObserver.getRxConsumer())
          .transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
    } catch (Throwable throwable) {
      return Flux.error(throwable);
    }
  }
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

/**
 * Implements a stream -> unary call as {@link Flux} -> {@link Mono}, where the client transits a stream of
 * messages.
 */
public static <TRequest, TResponse> Mono<TResponse> manyToOne(
    Flux<TRequest> rxRequest,
    Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) {
  try {
    return Mono
        .<TResponse>create(emitter -> {
          ReactiveProducerStreamObserver<TRequest, TResponse> reactiveProducerStreamObserver = new ReactiveProducerStreamObserver<>(
              rxRequest,
              emitter::success,
              emitter::error,
              Runnables.doNothing());
          delegate.apply(
              new CancellableStreamObserver<>(reactiveProducerStreamObserver,
              reactiveProducerStreamObserver::cancel));
          reactiveProducerStreamObserver.rxSubscribe();
        })
        .transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
  } catch (Throwable throwable) {
    return Mono.error(throwable);
  }
}

代码示例来源:origin: com.salesforce.servicelibs/reactor-grpc-stub

Mono<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(
    Flux.from(streamObserverPublisher)
        .transform(Operators.<TRequest, TRequest>lift(new BackpressureChunkingLifter<TRequest>()))));
rxResponse.subscribe(
  value -> {

相关文章