reactor.core.publisher.Operators.onRejectedExecution()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(112)

本文整理了Java中reactor.core.publisher.Operators.onRejectedExecution()方法的一些代码示例,展示了Operators.onRejectedExecution()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onRejectedExecution()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onRejectedExecution

Operators.onRejectedExecution介绍

[英]Return a wrapped RejectedExecutionException which can be thrown by the operator. This exception denotes that an execution was rejected by a reactor.core.scheduler.Scheduler, notably when it was already disposed.

Wrapping is done by calling both Exceptions#failWithRejected(Throwable) and #onOperatorError(Subscription,Throwable,Object,Context) (with the passed Subscription).
[中]返回可由操作员抛出的包装的RejectedExecutionException。此异常表示执行被反应器拒绝。果心调度程序。调度程序,尤其是当它已被处置时。
通过调用异常#failWithRejected(Throwable)和#onOperatorError(Subscription,Throwable,Object,Context)(使用传递的订阅)完成包装。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Return a wrapped {@link RejectedExecutionException} which can be thrown by the
 * operator. This exception denotes that an execution was rejected by a
 * {@link reactor.core.scheduler.Scheduler}, notably when it was already disposed.
 * <p>
 * Wrapping is done by calling both {@link Exceptions#bubble(Throwable)} and
 * {@link #onOperatorError(Subscription, Throwable, Object, Context)}.
 *
 * @param original the original execution error
 * @param context a context that might hold a local error consumer
 *
 */
public static RuntimeException onRejectedExecution(Throwable original, Context context) {
  return onRejectedExecution(original, null, null, null, context);
}

代码示例来源:origin: reactor/reactor-core

@Override
  public void cancel() {
    if (CANCELLED.compareAndSet(this, 0, 1)) {
      try {
        scheduler.schedule(this);
      }
      catch (RejectedExecutionException ree) {
        throw Operators.onRejectedExecution(ree, actual.currentContext());
      }
    }
  }
}

代码示例来源:origin: reactor/reactor-core

void requestUpstream(final long n, final Subscription s) {
  if (!requestOnSeparateThread || Thread.currentThread() == THREAD.get(this)) {
    s.request(n);
  }
  else {
    try {
      worker.schedule(() -> s.request(n));
    }
    catch (RejectedExecutionException ree) {
      if(!worker.isDisposed()) {
        //FIXME should not throw but if we implement strict
        // serialization like in StrictSubscriber, onNext will carry an
        // extra cost
        throw Operators.onRejectedExecution(ree, this, null, null,
            actual.currentContext());
      }
    }
  }
}

代码示例来源:origin: reactor/reactor-core

void trySchedule(long n, Subscription s) {
  if (Thread.currentThread() == THREAD.get(this)) {
    s.request(n);
  }
  else {
    try {
      worker.schedule(() -> s.request(n));
    }
    catch (RejectedExecutionException ree) {
      if (!worker.isDisposed()) {
        actual.onError(Operators.onRejectedExecution(ree,
            this,
            null,
            null,
            actual.currentContext()));
      }
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onRejectedExecutionWithoutDataSignalDelegatesToErrorLocal() {
  BiFunction<Throwable, Object, Throwable> localHook = (e, v) ->
      new IllegalStateException("boom_" + v, e);
  Context c = Context.of(Hooks.KEY_ON_OPERATOR_ERROR, localHook);
  IllegalArgumentException failure = new IllegalArgumentException("foo");
  final Throwable throwable = Operators.onRejectedExecution(failure, null, null, null, c);
  assertThat(throwable).isInstanceOf(IllegalStateException.class)
             .hasMessage("boom_null")
             .hasNoSuppressedExceptions();
  assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
                  .hasMessage("Scheduler unavailable")
                  .hasCause(failure);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onRejectedExecutionWithDataSignalDelegatesToErrorLocal() {
  BiFunction<Throwable, Object, Throwable> localHook = (e, v) ->
      new IllegalStateException("boom_" + v, e);
  Context c = Context.of(Hooks.KEY_ON_OPERATOR_ERROR, localHook);
  IllegalArgumentException failure = new IllegalArgumentException("foo");
  final Throwable throwable = Operators.onRejectedExecution(failure, null,
      null, "bar", c);
  assertThat(throwable).isInstanceOf(IllegalStateException.class)
             .hasMessage("boom_bar")
             .hasNoSuppressedExceptions();
  assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
                  .hasMessage("Scheduler unavailable")
                  .hasCause(failure);
}

代码示例来源:origin: reactor/reactor-core

Disposable newPeriod() {
  try {
    return worker.schedulePeriodically(new ConsumerIndexHolder(producerIndex,
        this), timespan, timespan, TimeUnit.MILLISECONDS);
  }
  catch (Exception e) {
    actual.onError(Operators.onRejectedExecution(e, s, null, null, actual.currentContext()));
    return Disposables.disposed();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onRejectedExecutionLocalTakesPrecedenceOverOnOperatorError() {
  BiFunction<Throwable, Object, Throwable> localOperatorErrorHook = (e, v) ->
      new IllegalStateException("boom_" + v, e);
  BiFunction<Throwable, Object, Throwable> localReeHook = (e, v) ->
      new IllegalStateException("rejected_" + v, e);
  Context c = Context.of(
      Hooks.KEY_ON_OPERATOR_ERROR, localOperatorErrorHook,
      Hooks.KEY_ON_REJECTED_EXECUTION, localReeHook);
  IllegalArgumentException failure = new IllegalArgumentException("foo");
  final Throwable throwable = Operators.onRejectedExecution(failure, null,
      null, "bar", c);
  assertThat(throwable).isInstanceOf(IllegalStateException.class)
             .hasMessage("rejected_bar")
             .hasNoSuppressedExceptions();
  assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
                  .hasMessage("Scheduler unavailable")
                  .hasCause(failure);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testOnRejectedWithOutsideRee() {
  RejectedExecutionException original = new RejectedExecutionException("outside");
  Exception suppressed = new Exception("suppressed");
  RuntimeException test = Operators.onRejectedExecution(original,
      null, suppressed, null, Context.empty());
  assertThat(test)
      .isNotSameAs(original)
      .isInstanceOf(RejectedExecutionException.class)
      .hasMessage("Scheduler unavailable")
      .hasCause(original)
      .hasSuppressedException(suppressed);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return;
  }
  this.done = true;
  try {
    this.task = scheduler.schedule(() -> complete(t), delay, unit);
  }
  catch (RejectedExecutionException ree) {
    throw Operators.onRejectedExecution(ree, this, null, t,
        actual.currentContext());
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testOnRejectedWithReactorRee() {
  Exception originalCause = new Exception("boom");
  RejectedExecutionException original = Exceptions.failWithRejected(originalCause);
  Exception suppressed = new Exception("suppressed");
  RuntimeException test = Operators.onRejectedExecution(original,
      null, suppressed, null, Context.empty());
  assertThat(test)
      .isSameAs(original)
      .hasSuppressedException(suppressed);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super Long> actual) {
  MonoDelayRunnable r = new MonoDelayRunnable(actual);
  actual.onSubscribe(r);
  try {
    r.setCancel(timedScheduler.schedule(r, delay, unit));
  }
  catch (RejectedExecutionException ree) {
    if(r.cancel != OperatorDisposables.DISPOSED) {
      actual.onError(Operators.onRejectedExecution(ree, r, null, null,
          actual.currentContext()));
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  FluxSubscribeOnCallable.CallableSubscribeOnSubscription<T> parent =
      new FluxSubscribeOnCallable.CallableSubscribeOnSubscription<>(actual, callable, scheduler);
  actual.onSubscribe(parent);
  try {
    parent.setMainFuture(scheduler.schedule(parent));
  }
  catch (RejectedExecutionException ree) {
    if(parent.state != FluxSubscribeOnCallable.CallableSubscribeOnSubscription.HAS_CANCELLED) {
      actual.onError(Operators.onRejectedExecution(ree, actual.currentContext()));
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  CallableSubscribeOnSubscription<T> parent =
      new CallableSubscribeOnSubscription<>(actual, callable, scheduler);
  actual.onSubscribe(parent);
  try {
    Disposable f = scheduler.schedule(parent);
    parent.setMainFuture(f);
  }
  catch (RejectedExecutionException ree) {
    if(parent.state != CallableSubscribeOnSubscription.HAS_CANCELLED) {
      actual.onError(Operators.onRejectedExecution(ree, actual.currentContext()));
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super Long> actual) {
  Worker w = timedScheduler.createWorker();
  IntervalRunnable r = new IntervalRunnable(actual, w);
  actual.onSubscribe(r);
  try {
    w.schedulePeriodically(r, initialDelay, period, unit);
  }
  catch (RejectedExecutionException ree) {
    if (!r.cancelled) {
      actual.onError(Operators.onRejectedExecution(ree, r, null, null,
          actual.currentContext()));
    }
  }
}

代码示例来源:origin: reactor/reactor-core

void trySchedule(
    @Nullable Subscription subscription,
    @Nullable Throwable suppressed,
    @Nullable Object dataSignal) {
    if(future != null){
      return;
    }
    try {
      future = this.scheduler.schedule(this);
    }
    catch (RejectedExecutionException ree) {
      actual.onError(Operators.onRejectedExecution(ree, subscription,
          suppressed,    dataSignal, actual.currentContext()));
    }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  Scheduler.Worker worker = scheduler.createWorker();
  SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
      actual, worker);
  actual.onSubscribe(parent);
  try {
    worker.schedule(parent);
  }
  catch (RejectedExecutionException ree) {
    if (parent.s != Operators.cancelledSubscription()) {
      actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
          actual.currentContext()));
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  T v = value;
  if (v == null) {
    ScheduledEmpty parent = new ScheduledEmpty(actual);
    actual.onSubscribe(parent);
    try {
      parent.setFuture(scheduler.schedule(parent));
    }
    catch (RejectedExecutionException ree) {
      if (parent.future != OperatorDisposables.DISPOSED) {
        actual.onError(Operators.onRejectedExecution(ree,
            actual.currentContext()));
      }
    }
  }
  else {
    actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler));
  }
}

代码示例来源:origin: reactor/reactor-core

void trySchedule(
    @Nullable Subscription subscription,
    @Nullable Throwable suppressed,
    @Nullable Object dataSignal) {
  if (WIP.getAndIncrement(this) != 0) {
    return;
  }
  try {
    worker.schedule(this);
  }
  catch (RejectedExecutionException ree) {
    Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
    actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
        actual.currentContext()));
  }
}

代码示例来源:origin: reactor/reactor-core

void trySchedule(
    @Nullable Subscription subscription,
    @Nullable Throwable suppressed,
    @Nullable Object dataSignal) {
  if (WIP.getAndIncrement(this) != 0) {
    return;
  }
  try {
    worker.schedule(this);
  }
  catch (RejectedExecutionException ree) {
    Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
    actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
        actual.currentContext()));
  }
}

相关文章