reactor.core.publisher.Hooks.resetOnNextDropped()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中reactor.core.publisher.Hooks.resetOnNextDropped()方法的一些代码示例,展示了Hooks.resetOnNextDropped()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.resetOnNextDropped()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:resetOnNextDropped

Hooks.resetOnNextDropped介绍

[英]Reset global data dropped strategy to throwing via reactor.core.Exceptions#failWithCancel()
[中]将全局数据丢弃策略重置为通过反应堆投掷。果心例外情况#failWithCancel()

代码示例

代码示例来源:origin: reactor/reactor-core

  1. public void unplugHooks() {
  2. Hooks.resetOnNextDropped();
  3. Hooks.resetOnErrorDropped();
  4. Hooks.resetOnOperatorError();
  5. }

代码示例来源:origin: reactor/reactor-core

  1. @SuppressWarnings("unchecked")
  2. void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
  3. FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
  4. .findFirst()
  5. .get();
  6. assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
  7. assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  8. assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
  9. Hooks.onNextDropped(v -> {
  10. });
  11. s.onNext(0);
  12. Hooks.resetOnNextDropped();
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @After
  2. public void tearDown() {
  3. scheduler.dispose();
  4. Hooks.resetOnNextDropped();
  5. Hooks.resetOnErrorDropped();
  6. Hooks.resetOnOperatorError();
  7. Schedulers.resetOnHandleError();
  8. onNexts.clear();
  9. onErrors.clear();
  10. onNextDropped.clear();
  11. onErrorDropped.clear();
  12. onOperatorError.clear();
  13. onOperatorErrorData.clear();
  14. onSchedulerHandleError.clear();
  15. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof NullPointerException);
  10. String data = "foo";
  11. Throwable exception = new NullPointerException("foo");
  12. assertThat(strategy.test(exception, data)).isTrue();
  13. Throwable t = strategy.process(exception, data, Context.empty());
  14. assertThat(t).isNull();
  15. assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
  16. assertThat(value.get()).isEqualTo("foo");
  17. }
  18. finally {
  19. Hooks.resetOnErrorDropped();
  20. Hooks.resetOnNextDropped();
  21. }
  22. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDrop() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. assertThat(strategy.test(exception, data)).isTrue();
  12. Throwable t = strategy.process(exception, data, Context.empty());
  13. assertThat(t).isNull();
  14. assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
  15. assertThat(value.get()).isEqualTo("foo");
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropErrorHookFails() {
  3. AtomicReference<Object> value = new AtomicReference<>();
  4. UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  5. Hooks.onNextDropped(value::set);
  6. Hooks.onErrorDropped(v -> { throw failure; });
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. Throwable t = strategy.process(exception, data, Context.empty());
  12. assertThat(t)
  13. .hasMessage("error hook")
  14. .hasSuppressedException(exception);
  15. assertThat(value.get()).isEqualTo("foo");
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void pollErrorModeLocalStrategy() {
  3. List<Object> nextDropped = new ArrayList<>();
  4. List<Object> errorDropped = new ArrayList<>();
  5. Hooks.onNextDropped(nextDropped::add);
  6. Hooks.onErrorDropped(errorDropped::add);
  7. Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  8. Exception error = new IllegalStateException("boom");
  9. try {
  10. assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
  11. RuntimeException e = Operators.onNextPollError("foo", error, c);
  12. assertThat(e).isNull();
  13. assertThat(nextDropped).containsExactly("foo");
  14. assertThat(errorDropped).containsExactly(error);
  15. }
  16. finally {
  17. Hooks.resetOnNextDropped();
  18. Hooks.resetOnErrorDropped();
  19. }
  20. }

代码示例来源:origin: reactor/reactor-core

  1. final void resetHooks() {
  2. Hooks.resetOnErrorDropped();
  3. Hooks.resetOnNextDropped();
  4. Hooks.resetOnEachOperator();
  5. Hooks.resetOnOperatorError();
  6. Hooks.resetOnLastOperator();
  7. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropValueHookFails() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. UnsupportedOperationException failure = new UnsupportedOperationException("value hook");
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(v -> { throw failure; });
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. Throwable t = strategy.process(exception, data, Context.empty());
  12. assertThat(t)
  13. .hasMessage("value hook")
  14. .hasSuppressedException(exception);
  15. assertThat(error.get()).isNull();
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void failNextIfTerminatedTake() {
  3. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  4. StepVerifier.create(Flux.from(s -> {
  5. s.onSubscribe(Operators.emptySubscription());
  6. s.onComplete();
  7. s.onNext(1);
  8. })
  9. .take(2))
  10. .verifyComplete();
  11. Hooks.resetOnNextDropped();
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropWithFatal() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NoSuchMethodError("foo");
  11. assertThat(strategy.test(exception, data)).isTrue();
  12. assertThatExceptionOfType(NoSuchMethodError.class)
  13. .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
  14. assertThat(error.get()).isNull();
  15. assertThat(value.get()).isNull();
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfNoMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof IllegalArgumentException);
  10. String data = "foo";
  11. Throwable exception = new NullPointerException("foo");
  12. assertThat(strategy.test(exception, data)).isFalse();
  13. Throwable t = strategy.process(exception, data, Context.empty());
  14. assertThat(t)
  15. .isSameAs(exception)
  16. .hasNoSuppressedExceptions();
  17. assertThat(error.get()).isNull();
  18. assertThat(value.get()).isNull();
  19. }
  20. finally {
  21. Hooks.resetOnErrorDropped();
  22. Hooks.resetOnNextDropped();
  23. }
  24. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfWithFatalNoMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof NullPointerException);
  10. String data = "foo";
  11. Throwable exception = new NoSuchMethodError("foo");
  12. assertThat(strategy.test(exception, data)).isFalse();
  13. assertThatExceptionOfType(NoSuchMethodError.class)
  14. .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
  15. assertThat(error.get()).isNull();
  16. assertThat(value.get()).isNull();
  17. }
  18. finally {
  19. Hooks.resetOnErrorDropped();
  20. Hooks.resetOnNextDropped();
  21. }
  22. }

代码示例来源:origin: reactor/reactor-core

  1. @Test // fixme when we have a fuseable testPublisher or an improved hide operator
  2. @SuppressWarnings("unchecked")
  3. public void failNextIfTerminatedTakeSourceConditional() {
  4. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  5. StepVerifier.create(Flux.from(s -> {
  6. s.onSubscribe(Operators.emptySubscription());
  7. s.onComplete();
  8. ((Fuseable.ConditionalSubscriber)s).tryOnNext(1);
  9. })
  10. .take(2)
  11. .filter("test2"::equals))
  12. .verifyComplete();
  13. Hooks.resetOnNextDropped();
  14. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void failNextIfTerminatedTakeConditional() {
  3. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  4. StepVerifier.create(Flux.from(s -> {
  5. s.onSubscribe(Operators.emptySubscription());
  6. s.onComplete();
  7. s.onNext(1);
  8. })
  9. .take(2)
  10. .filter("test2"::equals))
  11. .verifyComplete();
  12. Hooks.resetOnNextDropped();
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void failNextIfTerminatedTakeFused() {
  3. UnicastProcessor<Integer> up = UnicastProcessor.create();
  4. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  5. StepVerifier.create(up.take(2))
  6. .then(() -> up.actual.onComplete())
  7. .then(() -> up.actual.onNext(1))
  8. .verifyComplete();
  9. Hooks.resetOnNextDropped();
  10. }

代码示例来源:origin: reactor/reactor-core

  1. @After
  2. public void resetAllHooks() {
  3. Hooks.resetOnOperatorError();
  4. Hooks.resetOnNextDropped();
  5. Hooks.resetOnErrorDropped();
  6. Hooks.resetOnOperatorDebug();
  7. Hooks.resetOnEachOperator();
  8. Hooks.resetOnLastOperator();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @After
  2. public void resetAllHooks() {
  3. Hooks.resetOnOperatorError();
  4. Hooks.resetOnNextDropped();
  5. Hooks.resetOnErrorDropped();
  6. Hooks.resetOnOperatorDebug();
  7. Hooks.resetOnEachOperator();
  8. Hooks.resetOnLastOperator();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
  2. public void failNextOnTerminated() {
  3. UnicastProcessor<Integer> up = UnicastProcessor.create();
  4. Hooks.onNextDropped(c -> {
  5. assertThat(c).isEqualTo(2);
  6. });
  7. StepVerifier.create(up.flatMap(Flux::just))
  8. .then(() -> {
  9. up.onNext(1);
  10. CoreSubscriber<? super Integer> a = up.actual;
  11. up.onComplete();
  12. a.onNext(2);
  13. })
  14. .expectNext(1)
  15. .verifyComplete();
  16. Hooks.resetOnNextDropped();
  17. }

代码示例来源:origin: reactor/reactor-core

  1. @Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
  2. public void failDoubleNext() {
  3. Hooks.onNextDropped(c -> {
  4. });
  5. StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.just(2), s -> {
  6. s.onSubscribe(Operators.emptySubscription());
  7. s.onNext(2);
  8. s.onNext(3);
  9. }))
  10. .thenCancel()
  11. .verify();
  12. Hooks.resetOnNextDropped();
  13. }

相关文章