reactor.core.publisher.Operators.onDiscard()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(121)

本文整理了Java中reactor.core.publisher.Operators.onDiscard()方法的一些代码示例,展示了Operators.onDiscard()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onDiscard()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onDiscard

Operators.onDiscard介绍

[英]Invoke a (local or global) hook that processes elements that get discarded. This includes elements that are dropped (for malformed sources), but also filtered out (eg. not passing a filter() predicate).

For elements that are buffered or enqueued, but subsequently discarded due to cancellation or error, see #onDiscardMultiple(Stream,Context) and #onDiscardQueueWithClear(Queue,Context,Function).
[中]调用(本地或全局)钩子来处理被丢弃的元素。这包括删除的元素(对于格式错误的源),但也被过滤掉(例如,不传递filter()谓词)。
对于缓冲或排队但随后由于取消或错误而丢弃的元素,请参阅#onDiscardMultiple(流、上下文)和#onDiscardQueueWithClear(队列、上下文、函数)。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  if (this.state <= HAS_REQUEST_NO_VALUE) {
    Operators.onDiscard(value, currentContext());
  }
  this.state = CANCELLED;
  value = null;
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  // deliberately ignored
  Operators.onDiscard(t, actual.currentContext()); //FIXME cache context
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  Object old = VALUE.getAndSet(this, t);
  if (old != null) {
    Operators.onDiscard(old, ctx);
  }
}

代码示例来源:origin: reactor/reactor-core

void clear() {
  int n = toFilter.length();
  for (int i = 0; i < n; i++) {
    T old = toFilter.getAndSet(i, null);
    Operators.onDiscard(old, ctx);
  }
  innerResult = null;
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  T old = value;
  value = t;
  Operators.onDiscard(old, actual.currentContext()); //FIXME cache context
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(Object t) {
  // ignored
  Operators.onDiscard(t, currentContext()); //FIXME cache Context
}

代码示例来源:origin: reactor/reactor-core

@Override
  public void cancel() {
    Operators.onDiscard(value, actual.currentContext());
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public FluxSink<T> next(T t) {
  T old = queue.getAndSet(t);
  Operators.onDiscard(old, ctx);
  drain();
  return this;
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  long r = remaining;
  if (r == 0L) {
    actual.onNext(t);
  }
  else {
    Operators.onDiscard(t, ctx);
    remaining = r - 1;
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (gate) {
    actual.onNext(t);
  }
  else {
    Operators.onDiscard(t, ctx);
    main.request(1);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
void onCancel() {
  if (WIP.getAndIncrement(this) == 0) {
    T old = queue.getAndSet(null);
    Operators.onDiscard(old, ctx);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  Object toDiscard = VALUE.getAndSet(this, t);
  if (toDiscard != null) {
    Operators.onDiscard(toDiscard, ctx);
  }
  drain();
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  if (once == 0) {
    Operators.onDiscard(value, actual.currentContext());
  }
  ONCE.lazySet(this, 2);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void clear() {
  if (once == 0) {
    Operators.onDiscard(value, actual.currentContext());
  }
  ONCE.lazySet(this, 1);
}

代码示例来源:origin: reactor/reactor-core

void innerError(Throwable ex) {
  //if the inner subscriber (the filter one) errors, then we can
  //always propagate that error directly, as it means that the source Mono
  //was at least valued rather than in error.
  super.onError(ex);
  Operators.onDiscard(value, actual.currentContext());
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  if (STATE.getAndSet(this, HAS_REQUEST_HAS_VALUE) != HAS_REQUEST_HAS_VALUE) {
    T old = value;
    value = null;
    Operators.onDiscard(old, actual.currentContext());
    disposeResource(true);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void cancel() {
  if (!cancelled) {
    cancelled = true;
    s.cancel();
    if (WIP.getAndIncrement(this) == 0) {
      Object toDiscard = VALUE.getAndSet(this, null);
      if (toDiscard != null) {
        Operators.onDiscard(toDiscard, ctx);
      }
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (done) {
    Operators.onErrorDropped(t, actual.currentContext());
    return;
  }
  done = true;
  Operators.onDiscard(value, actual.currentContext());
  value = null;
  actual.onError(t);
}

代码示例来源:origin: reactor/reactor-core

void innerResult(@Nullable Boolean item) {
  if (item != null && item) {
    //will reset the value with itself, but using parent's `value` saves a field
    complete(value);
  }
  else {
    super.onComplete();
    Operators.onDiscard(value, actual.currentContext());
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (done) {
    Operators.onErrorDropped(t, actual.currentContext());
    return;
  }
  done = true;
  Operators.onDiscard(value, actual.currentContext());
  value = null;
  actual.onError(t);
}

相关文章