reactor.core.publisher.Operators.onDiscardMultiple()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中reactor.core.publisher.Operators.onDiscardMultiple()方法的一些代码示例,展示了Operators.onDiscardMultiple()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onDiscardMultiple()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onDiscardMultiple

Operators.onDiscardMultiple介绍

[英]Invoke a (local or global) hook that processes elements that get discarded en masse. This includes elements that are buffered but subsequently discarded due to cancellation or error.
[中]调用一个(本地或全局)钩子来处理被整体丢弃的元素。这包括缓冲但随后由于取消或错误而丢弃的元素。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  s.cancel();
  Operators.onDiscardMultiple(buffer, this.ctx);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  s.cancel();
  Operators.onDiscardMultiple(buffer, this.ctx);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void clear() {
  for(C b: this) {
    Operators.onDiscardMultiple(b, this.ctx);
  }
  super.clear();
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  Operators.terminate(S, this);
  Operators.onDiscardMultiple(buffer, this.ctx);
}

代码示例来源:origin: reactor/reactor-core

@Override
  public void clear() {
    Operators.onDiscardMultiple(this, ctx);
    super.clear();
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable throwable) {
  if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) {
    timer.dispose();
    synchronized (this) {
      C v = values;
      if(v != null) {
        Operators.onDiscardMultiple(v, this.ctx);
        v.clear();
        values = null;
      }
    }
    actual.onError(throwable);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
  public void cancel() {
    if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_CANCEL)) {
      timer.dispose();
      Subscription s = this.subscription;
      if (s != null) {
        this.subscription = null;
        s.cancel();
      }
      C v = values;
      if (v != null) {
        Operators.onDiscardMultiple(v, this.ctx);
        v.clear();
      }
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  C c = collection;
  if(c == null){
    return;
  }
  collection = null;
  Operators.onDiscardMultiple(c, currentContext());
  actual.onError(t);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  Operators.terminate(S, this);
  Operators.onDiscardMultiple(buffer, this.ctx);
  other.cancel();
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (done) {
    Operators.onErrorDropped(t, this.ctx);
    return;
  }
  done = true;
  actual.onError(t);
  Operators.onDiscardMultiple(buffer, this.ctx);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (done) {
    Operators.onErrorDropped(t, this.ctx);
    return;
  }
  done = true;
  Operators.onDiscardMultiple(buffer, this.ctx);
  buffer = null;
  actual.onError(t);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (done) {
    Operators.onErrorDropped(t, this.ctx);
    return;
  }
  done = true;
  C b = buffer;
  buffer = null;
  actual.onError(t);
  Operators.onDiscardMultiple(b, this.ctx);
}

代码示例来源:origin: reactor/reactor-core

@Override
  public void cancel() {
    //specific discard of the collection
    Operators.onDiscardMultiple(collection, currentContext());
    super.cancel();
    s.cancel();
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  if (Operators.terminate(S, this)) {
    cancelled = true;
    subscribers.dispose();
    Map<Long, BUFFER> bufs;
    synchronized (this) {
      bufs = buffers;
      buffers = null;
    }
    //first discard buffers that have been queued if they're not being drained...
    if (WINDOWS.getAndIncrement(this) == 0) {
      Operators.onDiscardQueueWithClear(queue, this.ctx, BUFFER::stream);
    }
    //...then discard unclosed buffers
    if (bufs != null && !bufs.isEmpty()) {
      for (BUFFER buffer : bufs.values()) {
        Operators.onDiscardMultiple(buffer, this.ctx);
      }
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (Exceptions.addThrowable(ERRORS, this, t)) {
    subscribers.dispose();
    Map<Long, BUFFER> bufs;
    synchronized (this) {
      bufs = buffers;
      buffers = null;
    }
    done = true;
    drain();
    if (bufs != null) {
      for (BUFFER b : bufs.values()) {
        Operators.onDiscardMultiple(b, this.ctx);
      }
    }
  }
  else {
    Operators.onErrorDropped(t, this.ctx);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if(Operators.terminate(S, this)) {
    C b;
    synchronized (this) {
      b = buffer;
      buffer = null;
    }
    other.cancel();
    actual.onError(t);
    Operators.onDiscardMultiple(b, this.ctx);
    return;
  }
  Operators.onErrorDropped(t, this.ctx);
}

代码示例来源:origin: reactor/reactor-core

Operators.onDiscardMultiple(v, this.ctx);

代码示例来源:origin: reactor/reactor-core

void boundaryError(Disposable boundary, Throwable ex) {
  Operators.terminate(S, this);
  subscribers.remove(boundary);
  if (Exceptions.addThrowable(ERRORS, this, ex)) {
    subscribers.dispose();
    Map<Long, BUFFER> bufs;
    synchronized (this) {
      bufs = buffers;
      buffers = null;
    }
    done = true;
    drain();
    if (bufs != null) {
      for (BUFFER buffer : bufs.values()) {
        Operators.onDiscardMultiple(buffer, this.ctx);
      }
    }
  }
  else {
    Operators.onErrorDropped(ex, this.ctx);
  }
}

代码示例来源:origin: reactor/reactor-core

boolean emit(C b) {
    long r = requested;
    if (r != 0L) {
      actual.onNext(b);
      if (r != Long.MAX_VALUE) {
        REQUESTED.decrementAndGet(this);
      }
      return true;
    }
    else {
      actual.onError(Operators.onOperatorError(this, Exceptions
          .failWithOverflow(), b, this.ctx));
      Operators.onDiscardMultiple(b, this.ctx);
      return false;
    }
  }
}

代码示例来源:origin: reactor/reactor-core

void otherError(Throwable t){
  Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
  if(s != Operators.cancelledSubscription()) {
    C b;
    synchronized (this) {
      b = buffer;
      buffer = null;
    }
    if(s != null){
      s.cancel();
    }
    actual.onError(t);
    Operators.onDiscardMultiple(b, this.ctx);
    return;
  }
  Operators.onErrorDropped(t, this.ctx);
}

相关文章