reactor.core.publisher.Mono.onAssembly()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(639)

本文整理了Java中reactor.core.publisher.Mono.onAssembly()方法的一些代码示例,展示了Mono.onAssembly()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onAssembly()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onAssembly

Mono.onAssembly介绍

[英]To be used by custom operators: invokes assembly Hooks pointcut given a Mono, potentially returning a new Mono. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized #checkpoint().
[中]供自定义运算符使用:调用给定Mono的程序集钩子切入点,可能返回新的Mono。例如,这对于在组装时激活横切关注点非常有用,例如,广义的#检查点()。

代码示例

代码示例来源:origin: resilience4j/resilience4j

public static <T> Mono<T> onAssembly(Mono<T> source) {
    return Mono.onAssembly(source);
  }
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link Mono} emitting the {@link Context} available on subscribe.
 * If no Context is available, the mono will simply emit the
 * {@link Context#empty() empty Context}.
 *
 * @return a new {@link Mono} emitting current context
 * @see #subscribe(CoreSubscriber)
 */
public static  Mono<Context> subscriberContext() {
  return onAssembly(MonoCurrentContext.INSTANCE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Counts the number of values in this {@link Flux}.
 * The count will be emitted when onComplete is observed.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/count.svg" alt="">
 *
 * @return a new {@link Mono} of {@link Long} count
 */
public final Mono<Long> count() {
  return Mono.onAssembly(new MonoCount<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Mono} into a hot source and cache last emitted signals for further {@link Subscriber}.
 * Completion and Error will also be replayed.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheForMono.svg" alt="">
 *
 * @return a replaying {@link Mono}
 */
public final Mono<T> cache() {
  return onAssembly(new MonoProcessor<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Hides the identity of this {@link Mono} instance.
 * 
 * <p>The main purpose of this operator is to prevent certain identity-based
 * optimizations from happening, mostly for diagnostic purposes.
 *
 * @return a new {@link Mono} preventing {@link Publisher} / {@link Subscription} based Reactor optimizations
 */
public final Mono<T> hide() {
  return onAssembly(new MonoHide<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Emit a single boolean true if this {@link Mono} has an element.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/hasElementForMono.svg" alt="">
 *
 * @return a new {@link Mono} with <code>true</code> if a value is emitted and <code>false</code>
 * otherwise
 */
public final Mono<Boolean> hasElement() {
  return onAssembly(new MonoHasElement<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Prepare a {@link Mono} which shares this {@link Flux} sequence and dispatches the
 * first observed item to subscribers in a backpressure-aware manner.
 * This will effectively turn any type of sequence into a hot sequence when the first
 * {@link Subscriber} subscribes.
 * <p>
 * <img class="marble" src="doc-files/marbles/publishNext.svg" alt="">
 *
 * @return a new {@link Mono}
 */
public final Mono<T> publishNext() {
  return Mono.onAssembly(new MonoProcessor<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Ignores onNext signals (dropping them) and only propagate termination events.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/ignoreElementsForFlux.svg" alt="">
 * <p>
 *
 * @reactor.discard This operator discards the upstream's elements.
 *
 * @return a new empty {@link Mono} representing the completion of this {@link Flux}.
 */
public final Mono<T> ignoreElements() {
  return Mono.onAssembly(new MonoIgnoreElements<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Pick the first available result coming from any of the given monos and populate a new {@literal Mono}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/firstForMono.svg" alt="">
 * <p>
 * @param monos The monos to use.
 * @param <T> The type of the function result.
 *
 * @return a {@link Mono}.
 */
public static <T> Mono<T> first(Iterable<? extends Mono<? extends T>> monos) {
  return onAssembly(new MonoFirst<>(monos));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Fallback to an alternative {@link Mono} if this mono is completed without data
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/switchIfEmptyForMono.svg" alt="">
 *
 * @param alternate the alternate mono if this mono is empty
 *
 * @return a {@link Mono} falling back upon source completing without elements
 * @see Flux#switchIfEmpty
 */
public final Mono<T> switchIfEmpty(Mono<? extends T> alternate) {
  return onAssembly(new MonoSwitchIfEmpty<>(this, alternate));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link Mono} that terminates with the specified error immediately after
 * being subscribed to.
 * <p>
 * <img class="marble" src="doc-files/marbles/error.svg" alt="">
 * <p>
 * @param error the onError signal
 * @param <T> the reified {@link Subscriber} type
 *
 * @return a failing {@link Mono}
 */
public static <T> Mono<T> error(Throwable error) {
  return onAssembly(new MonoError<>(error));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Ignores onNext signal (dropping it) and only propagates termination events.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/ignoreElementForMono.svg" alt="">
 * <p>
 *
 * @reactor.discard This operator discards the source element.
 *
 * @return a new empty {@link Mono} representing the completion of this {@link Mono}.
 */
public final Mono<T> ignoreElement() {
  return onAssembly(new MonoIgnoreElement<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Re-subscribes to this {@link Mono} sequence if it signals any error
 * that matches the given {@link Predicate}, otherwise push the error downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/retryWithPredicateForMono.svg" alt="">
 *
 * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
 *
 * @return a {@link Mono} that retries on onError if the predicates matches.
 */
public final Mono<T> retry(Predicate<? super Throwable> retryMatcher) {
  return onAssembly(new MonoRetryPredicate<>(this, retryMatcher));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Emit a single boolean true if this {@link Flux} sequence has at least one element.
 * <p>
 * The implementation uses short-circuit logic and completes with true on onNext.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/hasElements.svg" alt="">
 *
 * @return a new {@link Mono} with <code>true</code> if any value is emitted and <code>false</code>
 * otherwise
 */
public final Mono<Boolean> hasElements() {
  return Mono.onAssembly(new MonoHasElements<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link Mono} provider that will {@link Supplier#get supply} a target {@link Mono} to subscribe to for
 * each {@link Subscriber} downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/deferForMono.svg" alt="">
 * <p>
 * @param supplier a {@link Mono} factory
 * @param <T> the element type of the returned Mono instance
 * @return a new {@link Mono} factory
 */
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier) {
  return onAssembly(new MonoDefer<>(supplier));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Share a {@link Mono} for the duration of a function that may transform it and
 * consume it as many times as necessary without causing multiple subscriptions
 * to the upstream.
 *
 * @param transform the transformation function
 * @param <R> the output value type
 *
 * @return a new {@link Mono}
 */
public final <R> Mono<R> publish(Function<? super Mono<T>, ? extends Mono<? extends
    R>> transform) {
  return onAssembly(new MonoPublishMulticast<>(this, transform));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link Mono}, producing its value using the provided {@link Supplier}. If
 * the Supplier resolves to {@code null}, the resulting Mono completes empty.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/fromSupplier.svg" alt="">
 * <p>
 * @param supplier {@link Supplier} that will produce the value
 * @param <T> type of the expected value
 *
 * @return A {@link Mono}.
 */
public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier) {
  return onAssembly(new MonoSupplier<>(supplier));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link Mono} that completes empty once the provided {@link Runnable} has
 * been executed.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/fromRunnable.svg" alt="">
 * <p>
 * @param runnable {@link Runnable} that will be executed before emitting the completion signal
 *
 * @param <T> The generic type of the upstream, which is preserved by this operator
 * @return A {@link Mono}.
 */
public static <T> Mono<T> fromRunnable(Runnable runnable) {
  return onAssembly(new MonoRunnable<>(runnable));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Prepare this {@link Mono} so that subscribers will cancel from it on a
 * specified
 * {@link Scheduler}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/cancelOnForMono.svg" alt="">
 *
 * @param scheduler the {@link Scheduler} to signal cancel  on
 *
 * @return a scheduled cancel {@link Mono}
 */
public final Mono<T> cancelOn(Scheduler scheduler) {
  return onAssembly(new MonoCancelOn<>(this, scheduler));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Emit only the first item emitted by this {@link Flux}, into a new {@link Mono}.
 * <p>
 * <img class="marble" src="doc-files/marbles/next.svg" alt="">
 *
 * @return a new {@link Mono} emitting the first value in this {@link Flux}
 */
public final Mono<T> next() {
  if(this instanceof Callable){
    @SuppressWarnings("unchecked")
    Callable<T> m = (Callable<T>)this;
    return convertToMono(m);
  }
  return Mono.onAssembly(new MonoNext<>(this));
}

相关文章

Mono类方法