本文整理了Java中reactor.core.publisher.Mono.onAssembly()
方法的一些代码示例,展示了Mono.onAssembly()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onAssembly()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onAssembly
[英]To be used by custom operators: invokes assembly Hooks pointcut given a Mono, potentially returning a new Mono. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized #checkpoint().
[中]供自定义运算符使用:调用给定Mono的程序集钩子切入点,可能返回新的Mono。例如,这对于在组装时激活横切关注点非常有用,例如,广义的#检查点()。
代码示例来源:origin: resilience4j/resilience4j
public static <T> Mono<T> onAssembly(Mono<T> source) {
return Mono.onAssembly(source);
}
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Mono} emitting the {@link Context} available on subscribe.
* If no Context is available, the mono will simply emit the
* {@link Context#empty() empty Context}.
*
* @return a new {@link Mono} emitting current context
* @see #subscribe(CoreSubscriber)
*/
public static Mono<Context> subscriberContext() {
return onAssembly(MonoCurrentContext.INSTANCE);
}
代码示例来源:origin: reactor/reactor-core
/**
* Counts the number of values in this {@link Flux}.
* The count will be emitted when onComplete is observed.
*
* <p>
* <img class="marble" src="doc-files/marbles/count.svg" alt="">
*
* @return a new {@link Mono} of {@link Long} count
*/
public final Mono<Long> count() {
return Mono.onAssembly(new MonoCount<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Mono} into a hot source and cache last emitted signals for further {@link Subscriber}.
* Completion and Error will also be replayed.
* <p>
* <img class="marble" src="doc-files/marbles/cacheForMono.svg" alt="">
*
* @return a replaying {@link Mono}
*/
public final Mono<T> cache() {
return onAssembly(new MonoProcessor<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Hides the identity of this {@link Mono} instance.
*
* <p>The main purpose of this operator is to prevent certain identity-based
* optimizations from happening, mostly for diagnostic purposes.
*
* @return a new {@link Mono} preventing {@link Publisher} / {@link Subscription} based Reactor optimizations
*/
public final Mono<T> hide() {
return onAssembly(new MonoHide<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Emit a single boolean true if this {@link Mono} has an element.
*
* <p>
* <img class="marble" src="doc-files/marbles/hasElementForMono.svg" alt="">
*
* @return a new {@link Mono} with <code>true</code> if a value is emitted and <code>false</code>
* otherwise
*/
public final Mono<Boolean> hasElement() {
return onAssembly(new MonoHasElement<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Prepare a {@link Mono} which shares this {@link Flux} sequence and dispatches the
* first observed item to subscribers in a backpressure-aware manner.
* This will effectively turn any type of sequence into a hot sequence when the first
* {@link Subscriber} subscribes.
* <p>
* <img class="marble" src="doc-files/marbles/publishNext.svg" alt="">
*
* @return a new {@link Mono}
*/
public final Mono<T> publishNext() {
return Mono.onAssembly(new MonoProcessor<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Ignores onNext signals (dropping them) and only propagate termination events.
*
* <p>
* <img class="marble" src="doc-files/marbles/ignoreElementsForFlux.svg" alt="">
* <p>
*
* @reactor.discard This operator discards the upstream's elements.
*
* @return a new empty {@link Mono} representing the completion of this {@link Flux}.
*/
public final Mono<T> ignoreElements() {
return Mono.onAssembly(new MonoIgnoreElements<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Pick the first available result coming from any of the given monos and populate a new {@literal Mono}.
*
* <p>
* <img class="marble" src="doc-files/marbles/firstForMono.svg" alt="">
* <p>
* @param monos The monos to use.
* @param <T> The type of the function result.
*
* @return a {@link Mono}.
*/
public static <T> Mono<T> first(Iterable<? extends Mono<? extends T>> monos) {
return onAssembly(new MonoFirst<>(monos));
}
代码示例来源:origin: reactor/reactor-core
/**
* Fallback to an alternative {@link Mono} if this mono is completed without data
*
* <p>
* <img class="marble" src="doc-files/marbles/switchIfEmptyForMono.svg" alt="">
*
* @param alternate the alternate mono if this mono is empty
*
* @return a {@link Mono} falling back upon source completing without elements
* @see Flux#switchIfEmpty
*/
public final Mono<T> switchIfEmpty(Mono<? extends T> alternate) {
return onAssembly(new MonoSwitchIfEmpty<>(this, alternate));
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Mono} that terminates with the specified error immediately after
* being subscribed to.
* <p>
* <img class="marble" src="doc-files/marbles/error.svg" alt="">
* <p>
* @param error the onError signal
* @param <T> the reified {@link Subscriber} type
*
* @return a failing {@link Mono}
*/
public static <T> Mono<T> error(Throwable error) {
return onAssembly(new MonoError<>(error));
}
代码示例来源:origin: reactor/reactor-core
/**
* Ignores onNext signal (dropping it) and only propagates termination events.
*
* <p>
* <img class="marble" src="doc-files/marbles/ignoreElementForMono.svg" alt="">
* <p>
*
* @reactor.discard This operator discards the source element.
*
* @return a new empty {@link Mono} representing the completion of this {@link Mono}.
*/
public final Mono<T> ignoreElement() {
return onAssembly(new MonoIgnoreElement<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Re-subscribes to this {@link Mono} sequence if it signals any error
* that matches the given {@link Predicate}, otherwise push the error downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/retryWithPredicateForMono.svg" alt="">
*
* @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
*
* @return a {@link Mono} that retries on onError if the predicates matches.
*/
public final Mono<T> retry(Predicate<? super Throwable> retryMatcher) {
return onAssembly(new MonoRetryPredicate<>(this, retryMatcher));
}
代码示例来源:origin: reactor/reactor-core
/**
* Emit a single boolean true if this {@link Flux} sequence has at least one element.
* <p>
* The implementation uses short-circuit logic and completes with true on onNext.
*
* <p>
* <img class="marble" src="doc-files/marbles/hasElements.svg" alt="">
*
* @return a new {@link Mono} with <code>true</code> if any value is emitted and <code>false</code>
* otherwise
*/
public final Mono<Boolean> hasElements() {
return Mono.onAssembly(new MonoHasElements<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Mono} provider that will {@link Supplier#get supply} a target {@link Mono} to subscribe to for
* each {@link Subscriber} downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/deferForMono.svg" alt="">
* <p>
* @param supplier a {@link Mono} factory
* @param <T> the element type of the returned Mono instance
* @return a new {@link Mono} factory
*/
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier) {
return onAssembly(new MonoDefer<>(supplier));
}
代码示例来源:origin: reactor/reactor-core
/**
* Share a {@link Mono} for the duration of a function that may transform it and
* consume it as many times as necessary without causing multiple subscriptions
* to the upstream.
*
* @param transform the transformation function
* @param <R> the output value type
*
* @return a new {@link Mono}
*/
public final <R> Mono<R> publish(Function<? super Mono<T>, ? extends Mono<? extends
R>> transform) {
return onAssembly(new MonoPublishMulticast<>(this, transform));
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Mono}, producing its value using the provided {@link Supplier}. If
* the Supplier resolves to {@code null}, the resulting Mono completes empty.
*
* <p>
* <img class="marble" src="doc-files/marbles/fromSupplier.svg" alt="">
* <p>
* @param supplier {@link Supplier} that will produce the value
* @param <T> type of the expected value
*
* @return A {@link Mono}.
*/
public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier) {
return onAssembly(new MonoSupplier<>(supplier));
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Mono} that completes empty once the provided {@link Runnable} has
* been executed.
*
* <p>
* <img class="marble" src="doc-files/marbles/fromRunnable.svg" alt="">
* <p>
* @param runnable {@link Runnable} that will be executed before emitting the completion signal
*
* @param <T> The generic type of the upstream, which is preserved by this operator
* @return A {@link Mono}.
*/
public static <T> Mono<T> fromRunnable(Runnable runnable) {
return onAssembly(new MonoRunnable<>(runnable));
}
代码示例来源:origin: reactor/reactor-core
/**
* Prepare this {@link Mono} so that subscribers will cancel from it on a
* specified
* {@link Scheduler}.
*
* <p>
* <img class="marble" src="doc-files/marbles/cancelOnForMono.svg" alt="">
*
* @param scheduler the {@link Scheduler} to signal cancel on
*
* @return a scheduled cancel {@link Mono}
*/
public final Mono<T> cancelOn(Scheduler scheduler) {
return onAssembly(new MonoCancelOn<>(this, scheduler));
}
代码示例来源:origin: reactor/reactor-core
/**
* Emit only the first item emitted by this {@link Flux}, into a new {@link Mono}.
* <p>
* <img class="marble" src="doc-files/marbles/next.svg" alt="">
*
* @return a new {@link Mono} emitting the first value in this {@link Flux}
*/
public final Mono<T> next() {
if(this instanceof Callable){
@SuppressWarnings("unchecked")
Callable<T> m = (Callable<T>)this;
return convertToMono(m);
}
return Mono.onAssembly(new MonoNext<>(this));
}
内容来源于网络,如有侵权,请联系作者删除!