本文整理了Java中reactor.core.publisher.Operators.onDiscardMultiple()
方法的一些代码示例,展示了Operators.onDiscardMultiple()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onDiscardMultiple()
方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onDiscardMultiple
[英]Invoke a (local or global) hook that processes elements that get discarded en masse. This includes elements that are buffered but subsequently discarded due to cancellation or error.
[中]调用一个(本地或全局)钩子来处理被整体丢弃的元素。这包括缓冲但随后由于取消或错误而丢弃的元素。
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
s.cancel();
Operators.onDiscardMultiple(buffer, this.ctx);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
s.cancel();
Operators.onDiscardMultiple(buffer, this.ctx);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void clear() {
for(C b: this) {
Operators.onDiscardMultiple(b, this.ctx);
}
super.clear();
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
Operators.terminate(S, this);
Operators.onDiscardMultiple(buffer, this.ctx);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void clear() {
Operators.onDiscardMultiple(this, ctx);
super.clear();
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable throwable) {
if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) {
timer.dispose();
synchronized (this) {
C v = values;
if(v != null) {
Operators.onDiscardMultiple(v, this.ctx);
v.clear();
values = null;
}
}
actual.onError(throwable);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_CANCEL)) {
timer.dispose();
Subscription s = this.subscription;
if (s != null) {
this.subscription = null;
s.cancel();
}
C v = values;
if (v != null) {
Operators.onDiscardMultiple(v, this.ctx);
v.clear();
}
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable t) {
C c = collection;
if(c == null){
return;
}
collection = null;
Operators.onDiscardMultiple(c, currentContext());
actual.onError(t);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
Operators.terminate(S, this);
Operators.onDiscardMultiple(buffer, this.ctx);
other.cancel();
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, this.ctx);
return;
}
done = true;
actual.onError(t);
Operators.onDiscardMultiple(buffer, this.ctx);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, this.ctx);
return;
}
done = true;
Operators.onDiscardMultiple(buffer, this.ctx);
buffer = null;
actual.onError(t);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, this.ctx);
return;
}
done = true;
C b = buffer;
buffer = null;
actual.onError(t);
Operators.onDiscardMultiple(b, this.ctx);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
//specific discard of the collection
Operators.onDiscardMultiple(collection, currentContext());
super.cancel();
s.cancel();
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
if (Operators.terminate(S, this)) {
cancelled = true;
subscribers.dispose();
Map<Long, BUFFER> bufs;
synchronized (this) {
bufs = buffers;
buffers = null;
}
//first discard buffers that have been queued if they're not being drained...
if (WINDOWS.getAndIncrement(this) == 0) {
Operators.onDiscardQueueWithClear(queue, this.ctx, BUFFER::stream);
}
//...then discard unclosed buffers
if (bufs != null && !bufs.isEmpty()) {
for (BUFFER buffer : bufs.values()) {
Operators.onDiscardMultiple(buffer, this.ctx);
}
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable t) {
if (Exceptions.addThrowable(ERRORS, this, t)) {
subscribers.dispose();
Map<Long, BUFFER> bufs;
synchronized (this) {
bufs = buffers;
buffers = null;
}
done = true;
drain();
if (bufs != null) {
for (BUFFER b : bufs.values()) {
Operators.onDiscardMultiple(b, this.ctx);
}
}
}
else {
Operators.onErrorDropped(t, this.ctx);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onError(Throwable t) {
if(Operators.terminate(S, this)) {
C b;
synchronized (this) {
b = buffer;
buffer = null;
}
other.cancel();
actual.onError(t);
Operators.onDiscardMultiple(b, this.ctx);
return;
}
Operators.onErrorDropped(t, this.ctx);
}
代码示例来源:origin: reactor/reactor-core
Operators.onDiscardMultiple(v, this.ctx);
代码示例来源:origin: reactor/reactor-core
void boundaryError(Disposable boundary, Throwable ex) {
Operators.terminate(S, this);
subscribers.remove(boundary);
if (Exceptions.addThrowable(ERRORS, this, ex)) {
subscribers.dispose();
Map<Long, BUFFER> bufs;
synchronized (this) {
bufs = buffers;
buffers = null;
}
done = true;
drain();
if (bufs != null) {
for (BUFFER buffer : bufs.values()) {
Operators.onDiscardMultiple(buffer, this.ctx);
}
}
}
else {
Operators.onErrorDropped(ex, this.ctx);
}
}
代码示例来源:origin: reactor/reactor-core
boolean emit(C b) {
long r = requested;
if (r != 0L) {
actual.onNext(b);
if (r != Long.MAX_VALUE) {
REQUESTED.decrementAndGet(this);
}
return true;
}
else {
actual.onError(Operators.onOperatorError(this, Exceptions
.failWithOverflow(), b, this.ctx));
Operators.onDiscardMultiple(b, this.ctx);
return false;
}
}
}
代码示例来源:origin: reactor/reactor-core
void otherError(Throwable t){
Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
if(s != Operators.cancelledSubscription()) {
C b;
synchronized (this) {
b = buffer;
buffer = null;
}
if(s != null){
s.cancel();
}
actual.onError(t);
Operators.onDiscardMultiple(b, this.ctx);
return;
}
Operators.onErrorDropped(t, this.ctx);
}
内容来源于网络,如有侵权,请联系作者删除!