
x33g5p2x  于2022-01-20 转载在 其他  



[英]Override global data dropped strategy which by default logs at DEBUG level.

The hook is cumulative, so calling this method several times will set up the hook for as many consumer invocations (even if called with the same consumer instance).


代码示例来源:origin: reactor/reactor-core

  1. private void plugHooks() {
  2. Hooks.onErrorDropped(droppedErrors::offer);
  3. Hooks.onNextDropped(droppedElements::offer);
  4. Hooks.onOperatorError((t, d) -> {
  5. operatorErrors.offer(Tuples.of(Optional.ofNullable(t), Optional.ofNullable(d)));
  6. return t;
  7. });
  8. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onNextDroppedFailReplaces() {
  3. AtomicReference<Object> dropHook = new AtomicReference<>();
  4. Publisher<Integer> p = s -> {
  5. s.onSubscribe(Operators.emptySubscription());
  6. s.onNext(1);
  7. s.onNext(2);
  8. s.onNext(3);
  9. };
  10. List<Integer> seen = new ArrayList<>();
  11. try {
  12. Hooks.onNextDropped(dropHook::set);
  13. Hooks.onNextDroppedFail();
  14. assertThatExceptionOfType(RuntimeException.class)
  15. .isThrownBy(() -> Flux.from(p).take(2).subscribe(seen::add))
  16. .isInstanceOf(RuntimeException.class)
  17. .matches(Exceptions::isCancel);
  18. assertThat(seen).containsExactly(1, 2);
  19. assertThat(dropHook.get()).isNull();
  20. }
  21. finally {
  22. Hooks.resetOnNextDropped();
  23. }
  24. }
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @SuppressWarnings("unchecked")
  2. void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
  3. FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
  4. .findFirst()
  5. .get();
  6. assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
  7. assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  8. assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
  9. Hooks.onNextDropped(v -> {
  10. });
  11. s.onNext(0);
  12. Hooks.resetOnNextDropped();
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Override
  2. protected List<Scenario<String, Signal<String>>> scenarios_errorFromUpstreamFailure() {
  3. return Arrays.asList(
  4. scenario(Flux::materialize)
  5. .verifier(step -> {
  6. Hooks.onErrorDropped(c -> assertThat(c).hasMessage("dropped"));
  7. Hooks.onNextDropped(c -> assertThat(c).isEqualTo("dropped"));
  8. step.assertNext(s -> assertThat(s
  9. .getThrowable()).hasMessage("test"))
  10. .verifyComplete();
  11. })
  12. );
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Before
  2. public void setUp() {
  3. scheduler = new BoundedScheduler(Schedulers.newSingle("bounded-single"));
  4. Hooks.onNextDropped(o -> onNextDropped.add(o));
  5. Hooks.onErrorDropped(e -> onErrorDropped.add(e));
  6. Hooks.onOperatorError((e, o) -> {
  7. onOperatorError.add(e);
  8. if (o instanceof Long)
  9. onOperatorErrorData.add((Long) o);
  10. else if (o != null) {
  11. System.out.println(o);
  12. }
  13. return e;
  14. });
  15. Schedulers.onHandleError((thread, t) -> onSchedulerHandleError.add(t));
  16. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof NullPointerException);
  10. String data = "foo";
  11. Throwable exception = new NullPointerException("foo");
  12. assertThat(strategy.test(exception, data)).isTrue();
  13. Throwable t = strategy.process(exception, data, Context.empty());
  14. assertThat(t).isNull();
  15. assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
  16. assertThat(value.get()).isEqualTo("foo");
  17. }
  18. finally {
  19. Hooks.resetOnErrorDropped();
  20. Hooks.resetOnNextDropped();
  21. }
  22. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDrop() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. assertThat(strategy.test(exception, data)).isTrue();
  12. Throwable t = strategy.process(exception, data, Context.empty());
  13. assertThat(t).isNull();
  14. assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
  15. assertThat(value.get()).isEqualTo("foo");
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropErrorHookFails() {
  3. AtomicReference<Object> value = new AtomicReference<>();
  4. UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  5. Hooks.onNextDropped(value::set);
  6. Hooks.onErrorDropped(v -> { throw failure; });
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. Throwable t = strategy.process(exception, data, Context.empty());
  12. assertThat(t)
  13. .hasMessage("error hook")
  14. .hasSuppressedException(exception);
  15. assertThat(value.get()).isEqualTo("foo");
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfWithFatalMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof NoSuchMethodError);
  10. String data = "foo";
  11. Throwable exception = new NoSuchMethodError("foo");
  12. assertThat(strategy.test(exception, data)).isTrue();
  13. Throwable t = strategy.process(exception, data, Context.empty());
  14. assertThat(t).isNull();
  15. assertThat(error.get())
  16. .isInstanceOf(NoSuchMethodError.class)
  17. .hasMessage("foo");
  18. assertThat(value.get()).isEqualTo("foo");
  19. }
  20. finally {
  21. Hooks.resetOnErrorDropped();
  22. Hooks.resetOnNextDropped();
  23. }
  24. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void pollErrorModeLocalStrategy() {
  3. List<Object> nextDropped = new ArrayList<>();
  4. List<Object> errorDropped = new ArrayList<>();
  5. Hooks.onNextDropped(nextDropped::add);
  6. Hooks.onErrorDropped(errorDropped::add);
  7. Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  8. Exception error = new IllegalStateException("boom");
  9. try {
  10. assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
  11. RuntimeException e = Operators.onNextPollError("foo", error, c);
  12. assertThat(e).isNull();
  13. assertThat(nextDropped).containsExactly("foo");
  14. assertThat(errorDropped).containsExactly(error);
  15. }
  16. finally {
  17. Hooks.resetOnNextDropped();
  18. Hooks.resetOnErrorDropped();
  19. }
  20. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropValueHookFails() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. UnsupportedOperationException failure = new UnsupportedOperationException("value hook");
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(v -> { throw failure; });
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. Throwable t = strategy.process(exception, data, Context.empty());
  12. assertThat(t)
  13. .hasMessage("value hook")
  14. .hasSuppressedException(exception);
  15. assertThat(error.get()).isNull();
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropWithFatal() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NoSuchMethodError("foo");
  11. assertThat(strategy.test(exception, data)).isTrue();
  12. assertThatExceptionOfType(NoSuchMethodError.class)
  13. .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
  14. assertThat(error.get()).isNull();
  15. assertThat(value.get()).isNull();
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void failNextIfTerminatedTake() {
  3. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  4. StepVerifier.create(Flux.from(s -> {
  5. s.onSubscribe(Operators.emptySubscription());
  6. s.onComplete();
  7. s.onNext(1);
  8. })
  9. .take(2))
  10. .verifyComplete();
  11. Hooks.resetOnNextDropped();
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfWithFatalNoMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof NullPointerException);
  10. String data = "foo";
  11. Throwable exception = new NoSuchMethodError("foo");
  12. assertThat(strategy.test(exception, data)).isFalse();
  13. assertThatExceptionOfType(NoSuchMethodError.class)
  14. .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
  15. assertThat(error.get()).isNull();
  16. assertThat(value.get()).isNull();
  17. }
  18. finally {
  19. Hooks.resetOnErrorDropped();
  20. Hooks.resetOnNextDropped();
  21. }
  22. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropIfNoMatch() {
  3. AtomicReference<Throwable> error = new AtomicReference<>();
  4. AtomicReference<Object> value = new AtomicReference<>();
  5. Hooks.onErrorDropped(error::set);
  6. Hooks.onNextDropped(value::set);
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
  9. e -> e instanceof IllegalArgumentException);
  10. String data = "foo";
  11. Throwable exception = new NullPointerException("foo");
  12. assertThat(strategy.test(exception, data)).isFalse();
  13. Throwable t = strategy.process(exception, data, Context.empty());
  14. assertThat(t)
  15. .isSameAs(exception)
  16. .hasNoSuppressedExceptions();
  17. assertThat(error.get()).isNull();
  18. assertThat(value.get()).isNull();
  19. }
  20. finally {
  21. Hooks.resetOnErrorDropped();
  22. Hooks.resetOnNextDropped();
  23. }
  24. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void failNextIfTerminatedTakeConditional() {
  3. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  4. StepVerifier.create(Flux.from(s -> {
  5. s.onSubscribe(Operators.emptySubscription());
  6. s.onComplete();
  7. s.onNext(1);
  8. })
  9. .take(2)
  10. .filter("test2"::equals))
  11. .verifyComplete();
  12. Hooks.resetOnNextDropped();
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Test // fixme when we have a fuseable testPublisher or an improved hide operator
  2. @SuppressWarnings("unchecked")
  3. public void failNextIfTerminatedTakeSourceConditional() {
  4. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  5. StepVerifier.create(Flux.from(s -> {
  6. s.onSubscribe(Operators.emptySubscription());
  7. s.onComplete();
  8. ((Fuseable.ConditionalSubscriber)s).tryOnNext(1);
  9. })
  10. .take(2)
  11. .filter("test2"::equals))
  12. .verifyComplete();
  13. Hooks.resetOnNextDropped();
  14. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void failNextIfTerminatedTakeFused() {
  3. UnicastProcessor<Integer> up = UnicastProcessor.create();
  4. Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  5. StepVerifier.create(up.take(2))
  6. .then(() -> up.actual.onComplete())
  7. .then(() -> up.actual.onNext(1))
  8. .verifyComplete();
  9. Hooks.resetOnNextDropped();
  10. }

代码示例来源:origin: reactor/reactor-core

  1. @Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
  2. public void failNextOnTerminated() {
  3. UnicastProcessor<Integer> up = UnicastProcessor.create();
  4. Hooks.onNextDropped(c -> {
  5. assertThat(c).isEqualTo(2);
  6. });
  7. StepVerifier.create(up.flatMap(Flux::just))
  8. .then(() -> {
  9. up.onNext(1);
  10. CoreSubscriber<? super Integer> a = up.actual;
  11. up.onComplete();
  12. a.onNext(2);
  13. })
  14. .expectNext(1)
  15. .verifyComplete();
  16. Hooks.resetOnNextDropped();
  17. }

代码示例来源:origin: reactor/reactor-core

  1. @Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
  2. public void failDoubleNext() {
  3. Hooks.onNextDropped(c -> {
  4. });
  5. StepVerifier.create( -> 0, Flux.just(1), Flux.just(2), s -> {
  6. s.onSubscribe(Operators.emptySubscription());
  7. s.onNext(2);
  8. s.onNext(3);
  9. }))
  10. .thenCancel()
  11. .verify();
  12. Hooks.resetOnNextDropped();
  13. }
