reactor.core.publisher.Hooks类的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(270)

本文整理了Java中reactor.core.publisher.Hooks类的一些代码示例,展示了Hooks类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks类的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks

Hooks介绍

[英]A set of overridable lifecycle hooks that can be used for cross-cutting added behavior on Flux/ Mono operators.
[中]一组可重写的生命周期挂钩,可用于在Flux/Mono操作符上添加横切行为。

代码示例

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onLastOperatorReset() {
  3. Hooks.onLastOperator("some", p -> p);
  4. assertThat(Hooks.onLastOperatorHook).isNotNull();
  5. assertThat(Hooks.getOnLastOperatorHooks()).hasSize(1);
  6. Hooks.resetOnLastOperator();
  7. assertThat(Hooks.onLastOperatorHook).isNull();
  8. assertThat(Hooks.getOnLastOperatorHooks()).isEmpty();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. private void plugHooks() {
  2. Hooks.onErrorDropped(droppedErrors::offer);
  3. Hooks.onNextDropped(droppedElements::offer);
  4. Hooks.onOperatorError((t, d) -> {
  5. operatorErrors.offer(Tuples.of(Optional.ofNullable(t), Optional.ofNullable(d)));
  6. return t;
  7. });
  8. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void resumeDropErrorHookFails() {
  3. AtomicReference<Object> value = new AtomicReference<>();
  4. UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  5. Hooks.onNextDropped(value::set);
  6. Hooks.onErrorDropped(v -> { throw failure; });
  7. try {
  8. OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
  9. String data = "foo";
  10. Throwable exception = new NullPointerException("foo");
  11. Throwable t = strategy.process(exception, data, Context.empty());
  12. assertThat(t)
  13. .hasMessage("error hook")
  14. .hasSuppressedException(exception);
  15. assertThat(value.get()).isEqualTo("foo");
  16. }
  17. finally {
  18. Hooks.resetOnErrorDropped();
  19. Hooks.resetOnNextDropped();
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. public void unplugHooks() {
  2. Hooks.resetOnNextDropped();
  3. Hooks.resetOnErrorDropped();
  4. Hooks.resetOnOperatorError();
  5. }

代码示例来源:origin: reactor/reactor-core

  1. final void resetHooks() {
  2. Hooks.resetOnErrorDropped();
  3. Hooks.resetOnNextDropped();
  4. Hooks.resetOnEachOperator();
  5. Hooks.resetOnOperatorError();
  6. Hooks.resetOnLastOperator();
  7. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void accumulatingHooks() throws Exception {
  3. AtomicReference<String> ref = new AtomicReference<>();
  4. Hooks.onNextDropped(d -> {
  5. ref.set(d.toString());
  6. });
  7. Hooks.onNextDropped(d -> {
  8. ref.set(ref.get()+"bar");
  9. });
  10. Hooks.onErrorDropped(d -> {
  11. ref.set(d.getMessage());
  12. });
  13. Hooks.onErrorDropped(d -> {
  14. ref.set(ref.get()+"bar");
  15. });
  16. Hooks.resetOnErrorDropped();
  17. Hooks.onOperatorError((error, d) -> {
  18. ref.set(d.toString());
  19. return new Exception("bar");
  20. });
  21. Hooks.onOperatorError((error, d) -> {
  22. ref.set(ref.get()+error.getMessage());
  23. return error;
  24. Hooks.resetOnOperatorError();
  25. Hooks.onEachOperator(h -> {

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onOperatorError() {
  3. AtomicReference<Object> errorValue = new AtomicReference<Object>();
  4. Hooks.onOperatorError((error, d) -> {
  5. errorValue.set(d);
  6. return error;
  7. });
  8. Flux<Integer> f1 = Mono.just(1).flatMapMany(i -> Flux.error(new Exception("test")));
  9. StepVerifier.create(f1).verifyErrorMessage("test");
  10. assertThat(errorValue.get()).isEqualTo(1);
  11. Flux<Integer> f2 = Mono.just(2).flatMapMany(i -> {
  12. throw new RuntimeException("test");
  13. });
  14. StepVerifier.create(f2).verifyErrorMessage("test");
  15. assertThat(errorValue.get()).isEqualTo(2);
  16. Flux<Integer> f3 = Flux.just(3, 6, 9).flatMap(i -> Flux.error(new Exception("test")));
  17. StepVerifier.create(f3).verifyErrorMessage("test");
  18. assertThat(errorValue.get()).isEqualTo(3);
  19. Flux<Integer> f4 = Flux.just(4, 8, 12).flatMap(i -> {
  20. throw new RuntimeException("test");
  21. });
  22. StepVerifier.create(f4).verifyErrorMessage("test");
  23. assertThat(errorValue.get()).isEqualTo(4);
  24. Hooks.resetOnOperatorError();
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void completeHookErrorDropped() {
  3. Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("complete")));
  4. try {
  5. Mono.just("foo")
  6. .subscribe(v -> {},
  7. e -> {},
  8. () -> { throw new IllegalStateException("complete");});
  9. }
  10. finally {
  11. Hooks.resetOnErrorDropped();
  12. }
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testOnLastPublisher() throws Exception {
  3. List<Publisher> l = new ArrayList<>();
  4. Hooks.onLastOperator(p -> {
  5. System.out.println(Scannable.from(p).parents().count());
  6. System.out.println(Scannable.from(p).stepName());
  7. l.add(p);
  8. return p;
  9. });
  10. StepVerifier.create(Flux.just(1, 2, 3)
  11. .map(m -> m)
  12. .takeUntilOther(Mono.never())
  13. .flatMap(d -> Mono.just(d).hide()))
  14. .expectNext(1, 2, 3)
  15. .verifyComplete();
  16. Hooks.resetOnLastOperator();
  17. assertThat(l).hasSize(5);
  18. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testTrace() throws Exception {
  3. Hooks.onOperatorDebug();
  4. try {
  5. Mono.fromCallable(() -> {
  6. throw new RuntimeException();
  7. })
  8. .map(d -> d)
  9. .block();
  10. }
  11. catch(Exception e){
  12. e.printStackTrace();
  13. Assert.assertTrue(e.getSuppressed()[0].getMessage().contains("MonoCallable"));
  14. return;
  15. }
  16. finally {
  17. Hooks.resetOnOperatorDebug();
  18. }
  19. throw new IllegalStateException();
  20. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onOperatorErrorReset() {
  3. Hooks.onOperatorError("some", (t, v) -> t);
  4. assertThat(Hooks.onOperatorErrorHook).isNotNull();
  5. assertThat(Hooks.getOnOperatorErrorHooks()).hasSize(1);
  6. Hooks.resetOnOperatorError();
  7. assertThat(Hooks.onOperatorErrorHook).isNull();
  8. assertThat(Hooks.getOnOperatorErrorHooks()).isEmpty();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. /**
  2. * Add a {@link Publisher} operator interceptor for the last operator created
  3. * in every flow ({@link Flux} or {@link Mono}). The passed function is applied
  4. * to the original operator {@link Publisher} and can return a different {@link Publisher},
  5. * on the condition that it generically maintains the same data type as the original.
  6. * <p>
  7. * Note that sub-hooks are cumulative, but invoking this method twice with the same
  8. * instance (or any instance that has the same `toString`) will result in only a single
  9. * instance being applied. See {@link #onLastOperator(String, Function)} for a variant
  10. * that allows you to name the sub-hooks (and thus replace them or remove them individually
  11. * later on). Can be fully reset via {@link #resetOnLastOperator()}.
  12. * <p>
  13. * This pointcut function cannot make use of {@link Flux}, {@link Mono} or
  14. * {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
  15. * operator calls would effectively invoke onEachOperator from onEachOperator.
  16. *
  17. * @param onLastOperator the sub-hook: a function to intercept last operation call
  18. * (e.g. {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
  19. *
  20. * @see #onLastOperator(String, Function)
  21. * @see #resetOnLastOperator(String)
  22. * @see #resetOnLastOperator()
  23. * @see #onEachOperator(Function)
  24. */
  25. public static void onLastOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onLastOperator) {
  26. onLastOperator(onLastOperator.toString(), onLastOperator);
  27. }

代码示例来源:origin: reactor/reactor-core

  1. @SuppressWarnings("unchecked")
  2. void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
  3. FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
  4. .findFirst()
  5. .get();
  6. assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
  7. assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  8. assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
  9. Hooks.onNextDropped(v -> {
  10. });
  11. s.onNext(0);
  12. Hooks.resetOnNextDropped();
  13. }

代码示例来源:origin: reactor/reactor-core

  1. /**
  2. * Add a custom error mapping, overriding the default one. Custom mapping can be an
  3. * accumulation of several sub-hooks each subsequently added via this method.
  4. * <p>
  5. * Note that sub-hooks are cumulative, but invoking this method twice with the same
  6. * instance (or any instance that has the same `toString`) will result in only a single
  7. * instance being applied. See {@link #onOperatorError(String, BiFunction)} for a variant
  8. * that allows you to name the sub-hooks (and thus replace them or remove them individually
  9. * later on). Can be fully reset via {@link #resetOnOperatorError()}.
  10. * <p>
  11. * For reference, the default mapping is to unwrap the exception and, if the second
  12. * parameter is another exception, to add it to the first as suppressed.
  13. *
  14. * @param onOperatorError an operator error {@link BiFunction} mapper, returning an arbitrary exception
  15. * given the failure and optionally some original context (data or error).
  16. *
  17. * @see #onOperatorError(String, BiFunction)
  18. * @see #resetOnOperatorError(String)
  19. * @see #resetOnOperatorError()
  20. */
  21. public static void onOperatorError(BiFunction<? super Throwable, Object, ? extends Throwable> onOperatorError) {
  22. onOperatorError(onOperatorError.toString(), onOperatorError);
  23. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void verboseExtension() {
  3. Queue<String> q = new LinkedTransferQueue<>();
  4. Hooks.onEachOperator(p -> {
  5. q.offer(p.toString());
  6. return p;
  7. });
  8. Hooks.onOperatorDebug();
  9. Hooks.resetOnEachOperator();
  10. Hooks.onEachOperator(p -> {
  11. q.offer(p.toString());
  12. return p;
  13. Hooks.resetOnEachOperator();

代码示例来源:origin: reactor/reactor-core

  1. @Before
  2. public void populateDebug() {
  3. if (testName.getMethodName().equals("debuggingCommonStacktrace")) {
  4. toDebug = scatterAndGather(urls());
  5. }
  6. else if (testName.getMethodName().startsWith("debuggingActivated")) {
  7. Hooks.onOperatorDebug();
  8. toDebug = scatterAndGather(urls());
  9. }
  10. }

代码示例来源:origin: reactor/reactor-core

  1. };
  2. Hooks.onOperatorError("1", hook1);
  3. Hooks.onOperatorError("2", hook2);
  4. Hooks.onOperatorErrorHook.apply(new IllegalStateException("boom"), "foo");
  5. assertThat(Hooks.getOnOperatorErrorHooks())
  6. .containsOnlyKeys("1", "2");
  7. assertThat(Hooks.getOnOperatorErrorHooks().values())
  8. .containsExactly(hook1, hook2);
  9. assertThat(applied).containsExactly("h1", "h2");
  10. Hooks.onOperatorError("1", hook3);
  11. Hooks.onOperatorErrorHook.apply(new IllegalStateException("boom2"), "bar");
  12. assertThat(Hooks.getOnOperatorErrorHooks())
  13. .containsOnlyKeys("1", "2");
  14. assertThat(Hooks.getOnOperatorErrorHooks().values())
  15. .containsExactly(hook3, hook2);
  16. assertThat(applied).containsExactly("h3", "h2");

代码示例来源:origin: reactor/reactor-core

  1. };
  2. Hooks.onLastOperator("1", hook1);
  3. Hooks.onLastOperator("2", hook2);
  4. Hooks.onLastOperatorHook.apply(s -> {});
  5. assertThat(Hooks.getOnLastOperatorHooks())
  6. .containsOnlyKeys("1", "2");
  7. assertThat(Hooks.getOnLastOperatorHooks().values())
  8. .containsExactly(hook1, hook2);
  9. assertThat(applied).containsExactly("h1", "h2");
  10. Hooks.onLastOperator("1", hook3);
  11. Hooks.onLastOperatorHook.apply(s -> {});
  12. assertThat(Hooks.getOnLastOperatorHooks())
  13. .containsOnlyKeys("1", "2");
  14. assertThat(Hooks.getOnLastOperatorHooks().values())
  15. .containsExactly(hook3, hook2);
  16. assertThat(applied).containsExactly("h3", "h2");

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void parallelModeFused() {
  3. Hooks.onOperatorDebug();
  4. Hooks.onEachOperator(p -> {
  5. System.out.println(Scannable.from(p).stepName());
  6. return p;

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorReset() {
  3. Hooks.onEachOperator("some", p -> p);
  4. assertThat(Hooks.onEachOperatorHook).isNotNull();
  5. assertThat(Hooks.getOnEachOperatorHooks()).hasSize(1);
  6. Hooks.resetOnEachOperator();
  7. assertThat(Hooks.onEachOperatorHook).isNull();
  8. assertThat(Hooks.getOnEachOperatorHooks()).isEmpty();
  9. }

相关文章