reactor.core.publisher.Hooks.onErrorDropped()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(285)

本文整理了Java中reactor.core.publisher.Hooks.onErrorDropped()方法的一些代码示例,展示了Hooks.onErrorDropped()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.onErrorDropped()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:onErrorDropped

Hooks.onErrorDropped介绍

[英]Override global error dropped strategy which by default bubble back the error.

The hook is cumulative, so calling this method several times will set up the hook for as many consumer invocations (even if called with the same consumer instance).
[中]覆盖全局错误删除策略,默认情况下,该策略会回显错误。
钩子是累积的,因此多次调用此方法将为尽可能多的使用者调用设置钩子(即使使用相同的使用者实例调用)。

代码示例

代码示例来源:origin: reactor/reactor-core

private void plugHooks() {
  Hooks.onErrorDropped(droppedErrors::offer);
  Hooks.onNextDropped(droppedElements::offer);
  Hooks.onOperatorError((t, d) -> {
    operatorErrors.offer(Tuples.of(Optional.ofNullable(t), Optional.ofNullable(d)));
    return t;
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeHookErrorDropped() {
  Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("complete")));
  try {
    Mono.just("foo")
      .subscribe(v -> {},
          e -> {},
          () -> { throw new IllegalStateException("complete");});
  }
  finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, Signal<String>>> scenarios_errorFromUpstreamFailure() {
  return Arrays.asList(
      scenario(Flux::materialize)
          .verifier(step -> {
            Hooks.onErrorDropped(c -> assertThat(c).hasMessage("dropped"));
            Hooks.onNextDropped(c -> assertThat(c).isEqualTo("dropped"));
            step.assertNext(s -> assertThat(s
                .getThrowable()).hasMessage("test"))
              .verifyComplete();
          })
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void validationNull() {
  Hooks.onErrorDropped(e -> assertThat(e).isInstanceOf(NullPointerException.class)
                      .hasMessage("next is null"));
  try {
    assertThat(OperatorDisposables.validate(null, null,
        e -> Operators.onErrorDropped(e, Context.empty()))).isFalse();
  } finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Before
public void setUp() {
  scheduler = new BoundedScheduler(Schedulers.newSingle("bounded-single"));
  Hooks.onNextDropped(o -> onNextDropped.add(o));
  Hooks.onErrorDropped(e -> onErrorDropped.add(e));
  Hooks.onOperatorError((e, o) -> {
    onOperatorError.add(e);
    if (o instanceof Long)
      onOperatorErrorData.add((Long) o);
    else if (o != null) {
      System.out.println(o);
    }
    return e;
  });
  Schedulers.onHandleError((thread, t) -> onSchedulerHandleError.add(t));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDrop() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropErrorHookFails() {
  AtomicReference<Object> value = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  Hooks.onNextDropped(value::set);
  Hooks.onErrorDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("error hook")
        .hasSuppressedException(exception);
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
  Hooks.onErrorDropped(e -> {
    assertThat(e).hasMessage("test2");
  });
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onError(new Exception("test"));
    s.onError(new Exception("test2"));
  }).flatMap(Flux::just))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pollErrorModeLocalStrategy() {
  List<Object> nextDropped = new ArrayList<>();
  List<Object> errorDropped = new ArrayList<>();
  Hooks.onNextDropped(nextDropped::add);
  Hooks.onErrorDropped(errorDropped::add);
  Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  Exception error = new IllegalStateException("boom");
  try {
    assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
    RuntimeException e = Operators.onNextPollError("foo", error, c);
    assertThat(e).isNull();
    assertThat(nextDropped).containsExactly("foo");
    assertThat(errorDropped).containsExactly(error);
  }
  finally {
    Hooks.resetOnNextDropped();
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failOverflowScalarThenError() {
  AtomicBoolean set = new AtomicBoolean();
  Hooks.onErrorDropped(e -> {
    assertThat(Exceptions.isOverflow(e)).isTrue();
    set.set(true);
  });
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext(1);
    Exceptions.terminate(FluxFlatMap.FlatMapMain.ERROR, (FluxFlatMap.FlatMapMain) s);
    s.onNext(2);
    ((FluxFlatMap.FlatMapMain)s).error = null;
    s.onError(new Exception("test"));
  })
             .flatMap(Flux::just, 1), 0)
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
  assertThat(set.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropWithFatal() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    assertThatExceptionOfType(NoSuchMethodError.class)
        .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfWithFatalNoMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isFalse();
    assertThatExceptionOfType(NoSuchMethodError.class)
        .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void failDoubleError() {
  Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
  StepVerifier.create(Flux.from(s -> {
    assertTrackableBeforeOnSubscribe((InnerOperator)s);
    s.onSubscribe(Operators.emptySubscription());
    assertTrackableAfterOnSubscribe((InnerOperator)s);
    s.onError(new Exception("test"));
    assertTrackableAfterOnComplete((InnerOperator)s);
    s.onError(new Exception("test2"));
  })
              .take(2))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failOverflowWhileActiveScalarThenError() {
  AtomicBoolean set = new AtomicBoolean();
  Hooks.onErrorDropped(e -> {
    assertThat(Exceptions.isOverflow(e)).isTrue();
    set.set(true);
  });
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext(1);
    Exceptions.terminate(FluxFlatMap.FlatMapMain.ERROR, (FluxFlatMap.FlatMapMain) s);
    ((FluxFlatMap.FlatMapMain)s).wip = 1; //simulate concurrent active
    s.onNext(2);
    s.onNext(3);
    ((FluxFlatMap.FlatMapMain)s).error = null;
    ((FluxFlatMap.FlatMapMain)s).drainLoop();
    s.onError(new Exception("test"));
  })
              .flatMap(Flux::just, 1), 1)
        .expectNext(1)
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
  assertThat(set.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
  Hooks.onErrorDropped(e -> {
  });
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.never(), s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onError(new Exception("test"));
    s.onError(new Exception("test2"));
  }))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void failConditionalDoubleError() {
  Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
  StepVerifier.create(Flux.from(s -> {
    assertTrackableBeforeOnSubscribe((InnerOperator)s);
    s.onSubscribe(Operators.emptySubscription());
    assertTrackableAfterOnSubscribe((InnerOperator)s);
    s.onError(new Exception("test"));
    assertTrackableAfterOnComplete((InnerOperator)s);
    s.onError(new Exception("test2"));
  })
              .take(2).filter(d -> true))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failDoubleTerminalPublisher() {
  DirectProcessor<Integer> d1 = DirectProcessor.create();
  Hooks.onErrorDropped(e -> {
  });
  try {
    StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), d1, s -> {
      CoreSubscriber<?> a =
          ((DirectProcessor.DirectInner) d1.inners().findFirst().get())
              .actual;
      s.onSubscribe(Operators.emptySubscription());
      s.onComplete();
      a.onError(new Exception("test"));
    }))
          .verifyComplete();
  }
  finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorAfterNextIsNeverTriggered() {
  TestPublisher<String> source = TestPublisher.create();
  AtomicReference<Throwable> errorDropped = new AtomicReference<>();
  Hooks.onErrorDropped(errorDropped::set);
  try {
    StepVerifier.withVirtualTime(() ->
        new MonoDelayElement<>(source.mono(), 2, TimeUnit.SECONDS, defaultSchedulerForDelay()))
          .expectSubscription()
          .then(() -> source.next("foo").error(new IllegalStateException("boom")))
          .expectNoEvent(Duration.ofSeconds(2))
          .expectNext("foo")
          .verifyComplete();
  } finally {
    Hooks.resetOnErrorDropped();
  }
  assertThat(errorDropped.get()).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void failFusedDoubleError() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
  StepVerifier.create(up
              .take(2))
        .consumeSubscriptionWith(s -> {
          assertTrackableBeforeOnSubscribe((InnerOperator)s);
        })
        .then(() -> {
          assertTrackableAfterOnSubscribe((InnerOperator)up.actual);
          up.actual.onError(new Exception("test"));
          assertTrackableAfterOnComplete((InnerOperator)up.actual);
          up.actual.onError(new Exception("test2"));
        })
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

相关文章