reactor.core.publisher.Hooks.resetOnNextDropped()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中reactor.core.publisher.Hooks.resetOnNextDropped()方法的一些代码示例,展示了Hooks.resetOnNextDropped()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.resetOnNextDropped()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:resetOnNextDropped

Hooks.resetOnNextDropped介绍

[英]Reset global data dropped strategy to throwing via reactor.core.Exceptions#failWithCancel()
[中]将全局数据丢弃策略重置为通过反应堆投掷。果心例外情况#failWithCancel()

代码示例

代码示例来源:origin: reactor/reactor-core

public void unplugHooks() {
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorError();
}

代码示例来源:origin: reactor/reactor-core

@SuppressWarnings("unchecked")
void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
  FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
                                  .findFirst()
                                  .get();
  assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
  assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
  Hooks.onNextDropped(v -> {
  });
  s.onNext(0);
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@After
public void tearDown() {
  scheduler.dispose();
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorError();
  Schedulers.resetOnHandleError();
  onNexts.clear();
  onErrors.clear();
  onNextDropped.clear();
  onErrorDropped.clear();
  onOperatorError.clear();
  onOperatorErrorData.clear();
  onSchedulerHandleError.clear();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDrop() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropErrorHookFails() {
  AtomicReference<Object> value = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  Hooks.onNextDropped(value::set);
  Hooks.onErrorDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("error hook")
        .hasSuppressedException(exception);
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pollErrorModeLocalStrategy() {
  List<Object> nextDropped = new ArrayList<>();
  List<Object> errorDropped = new ArrayList<>();
  Hooks.onNextDropped(nextDropped::add);
  Hooks.onErrorDropped(errorDropped::add);
  Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  Exception error = new IllegalStateException("boom");
  try {
    assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
    RuntimeException e = Operators.onNextPollError("foo", error, c);
    assertThat(e).isNull();
    assertThat(nextDropped).containsExactly("foo");
    assertThat(errorDropped).containsExactly(error);
  }
  finally {
    Hooks.resetOnNextDropped();
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

final void resetHooks() {
  Hooks.resetOnErrorDropped();
  Hooks.resetOnNextDropped();
  Hooks.resetOnEachOperator();
  Hooks.resetOnOperatorError();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropValueHookFails() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("value hook");
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("value hook")
        .hasSuppressedException(exception);
    assertThat(error.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTake() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onNext(1);
  })
              .take(2))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropWithFatal() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    assertThatExceptionOfType(NoSuchMethodError.class)
        .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfNoMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof IllegalArgumentException);
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isFalse();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .isSameAs(exception)
        .hasNoSuppressedExceptions();
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfWithFatalNoMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isFalse();
    assertThatExceptionOfType(NoSuchMethodError.class)
        .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test // fixme when we have a fuseable testPublisher or an improved hide operator
@SuppressWarnings("unchecked")
public void failNextIfTerminatedTakeSourceConditional() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    ((Fuseable.ConditionalSubscriber)s).tryOnNext(1);
  })
              .take(2)
              .filter("test2"::equals))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTakeConditional() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onNext(1);
  })
              .take(2)
              .filter("test2"::equals))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTakeFused() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(up.take(2))
        .then(() -> up.actual.onComplete())
        .then(() -> up.actual.onNext(1))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@After
public void resetAllHooks() {
  Hooks.resetOnOperatorError();
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorDebug();
  Hooks.resetOnEachOperator();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@After
public void resetAllHooks() {
  Hooks.resetOnOperatorError();
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorDebug();
  Hooks.resetOnEachOperator();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failNextOnTerminated() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  Hooks.onNextDropped(c -> {
    assertThat(c).isEqualTo(2);
  });
  StepVerifier.create(up.flatMap(Flux::just))
        .then(() -> {
          up.onNext(1);
          CoreSubscriber<? super Integer> a = up.actual;
          up.onComplete();
          a.onNext(2);
        })
        .expectNext(1)
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleNext() {
  Hooks.onNextDropped(c -> {
  });
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.just(2), s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext(2);
    s.onNext(3);
  }))
        .thenCancel()
        .verify();
  Hooks.resetOnNextDropped();
}

相关文章