本文整理了Java中reactor.core.publisher.Operators.serialize()
方法的一些代码示例,展示了Operators.serialize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.serialize()
方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:serialize
[英]Safely gate a Subscriber by making sure onNext signals are delivered sequentially (serialized). Serialization uses thread-stealing and a potentially unbounded queue that might starve a calling thread if races are too important and Subscriber is slower.
[中]通过确保按顺序(序列化)发送onNext信号,安全地关闭订户。序列化使用线程窃取和一个潜在的无限队列,如果竞争太重要且订户速度较慢,则可能会导致调用线程饥饿。
代码示例来源:origin: reactor/reactor-core
TakeUntilMainSubscriber(CoreSubscriber<? super T> actual) {
this.actual = Operators.serialize(actual);
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link FluxProcessor} that safely gates multi-threaded producer
* {@link Subscriber#onNext(Object)}.
*
* @return a serializing {@link FluxProcessor}
*/
public final FluxProcessor<IN, OUT> serialize() {
return new DelegateProcessor<>(this, Operators.serialize(this));
}
代码示例来源:origin: reactor/reactor-core
SkipUntilMainSubscriber(CoreSubscriber<? super T> actual) {
this.actual = Operators.serialize(actual);
this.ctx = actual.currentContext();
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super C> actual) {
source.subscribe(new BufferTimeoutSubscriber<>(Operators.serialize(actual),
batchSize,
timespan,
timer.createWorker(),
bufferSupplier));
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super R> actual) {
CoreSubscriber<R> serial = Operators.serialize(actual);
WithLatestFromSubscriber<T, U, R> main =
new WithLatestFromSubscriber<>(serial, combiner);
WithLatestFromOtherSubscriber<U> secondary =
new WithLatestFromOtherSubscriber<>(main);
other.subscribe(secondary);
source.subscribe(main);
}
代码示例来源:origin: reactor/reactor-core
static <T> void subscribe(CoreSubscriber<? super T> s, Function<? super
Flux<Throwable>, ?
extends Publisher<?>> whenSourceFactory, Publisher<? extends T> source) {
RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber();
Subscriber<Throwable> signaller = Operators.serialize(other.completionSignal);
signaller.onSubscribe(Operators.emptySubscription());
CoreSubscriber<T> serial = Operators.serialize(s);
RetryWhenMainSubscriber<T> main =
new RetryWhenMainSubscriber<>(serial, signaller, source);
other.main = main;
serial.onSubscribe(main);
Publisher<?> p;
try {
p = Objects.requireNonNull(whenSourceFactory.apply(other),
"The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
s.onError(Operators.onOperatorError(e, s.currentContext()));
return;
}
p.subscribe(other);
if (!main.cancelled) {
source.subscribe(main);
}
}
代码示例来源:origin: reactor/reactor-core
FluxRepeatWhen.RepeatWhenOtherSubscriber other =
new FluxRepeatWhen.RepeatWhenOtherSubscriber();
Subscriber<Long> signaller = Operators.serialize(other.completionSignal);
CoreSubscriber<T> serial = Operators.serialize(actual);
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
RepeatWhenOtherSubscriber other = new RepeatWhenOtherSubscriber();
Subscriber<Long> signaller = Operators.serialize(other.completionSignal);
signaller.onSubscribe(Operators.emptySubscription());
CoreSubscriber<T> serial = Operators.serialize(actual);
RepeatWhenMainSubscriber<T> main =
new RepeatWhenMainSubscriber<>(serial, signaller, source);
other.main = main;
serial.onSubscribe(main);
Publisher<?> p;
try {
p = Objects.requireNonNull(whenSourceFactory.apply(other),
"The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
return;
}
p.subscribe(other);
if (!main.cancelled) {
source.subscribe(main);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super C> actual) {
C buffer;
try {
buffer = Objects.requireNonNull(bufferSupplier.get(),
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
return;
}
BufferBoundaryMain<T, U, C> parent =
new BufferBoundaryMain<>(
source instanceof FluxInterval ?
actual : Operators.serialize(actual),
buffer, bufferSupplier);
actual.onSubscribe(parent);
other.subscribe(parent.other);
source.subscribe(parent);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
SampleMainSubscriber<T> main = new SampleMainSubscriber<>(serial);
actual.onSubscribe(main);
other.subscribe(new SampleOther<>(main));
source.subscribe(main);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
TimeoutMainSubscriber<T, V> main =
new TimeoutMainSubscriber<>(serial, itemTimeout, other, timeoutDescription);
serial.onSubscribe(main);
TimeoutTimeoutSubscriber ts = new TimeoutTimeoutSubscriber(main, 0L);
main.setTimeout(ts);
firstTimeout.subscribe(ts);
source.subscribe(main);
}
代码示例来源:origin: reactor/reactor-core
@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
FluxTimeout.TimeoutMainSubscriber<T, V> main =
new FluxTimeout.TimeoutMainSubscriber<>(serial, NEVER, other,
addNameToTimeoutDescription(source, timeoutDescription));
serial.onSubscribe(main);
FluxTimeout.TimeoutTimeoutSubscriber ts =
new FluxTimeout.TimeoutTimeoutSubscriber(main, 0L);
main.setTimeout(ts);
firstTimeout.subscribe(ts);
source.subscribe(main);
}
}
代码示例来源:origin: io.projectreactor/reactor-core
TakeUntilMainSubscriber(CoreSubscriber<? super T> actual) {
this.actual = Operators.serialize(actual);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Create a {@link FluxProcessor} that safely gates multi-threaded producer
* {@link Subscriber#onNext(Object)}.
*
* @return a serializing {@link FluxProcessor}
*/
public final FluxProcessor<IN, OUT> serialize() {
return new DelegateProcessor<>(this, Operators.serialize(this));
}
代码示例来源:origin: io.projectreactor/reactor-core
SkipUntilMainSubscriber(CoreSubscriber<? super T> actual) {
this.actual = Operators.serialize(actual);
this.ctx = actual.currentContext();
}
代码示例来源:origin: io.projectreactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super C> actual) {
source.subscribe(new BufferTimeoutSubscriber<>(Operators.serialize(actual),
batchSize,
timespan,
timer.createWorker(),
bufferSupplier));
}
代码示例来源:origin: io.projectreactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super R> actual) {
CoreSubscriber<R> serial = Operators.serialize(actual);
WithLatestFromSubscriber<T, U, R> main =
new WithLatestFromSubscriber<>(serial, combiner);
WithLatestFromOtherSubscriber<U> secondary =
new WithLatestFromOtherSubscriber<>(main);
other.subscribe(secondary);
source.subscribe(main);
}
代码示例来源:origin: io.projectreactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
SampleMainSubscriber<T> main = new SampleMainSubscriber<>(serial);
actual.onSubscribe(main);
other.subscribe(new SampleOther<>(main));
source.subscribe(main);
}
代码示例来源:origin: io.projectreactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
TimeoutMainSubscriber<T, V> main =
new TimeoutMainSubscriber<>(serial, itemTimeout, other, timeoutDescription);
serial.onSubscribe(main);
TimeoutTimeoutSubscriber ts = new TimeoutTimeoutSubscriber(main, 0L);
main.setTimeout(ts);
firstTimeout.subscribe(ts);
source.subscribe(main);
}
代码示例来源:origin: io.projectreactor/reactor-core
@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
FluxTimeout.TimeoutMainSubscriber<T, V> main =
new FluxTimeout.TimeoutMainSubscriber<>(serial, NEVER, other,
addNameToTimeoutDescription(source, timeoutDescription));
serial.onSubscribe(main);
FluxTimeout.TimeoutTimeoutSubscriber ts =
new FluxTimeout.TimeoutTimeoutSubscriber(main, 0L);
main.setTimeout(ts);
firstTimeout.subscribe(ts);
source.subscribe(main);
}
}
内容来源于网络,如有侵权,请联系作者删除!