reactor.core.publisher.Hooks.onEachOperator()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(440)

本文整理了Java中reactor.core.publisher.Hooks.onEachOperator()方法的一些代码示例,展示了Hooks.onEachOperator()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.onEachOperator()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:onEachOperator

Hooks.onEachOperator介绍

[英]Add or replace a named Publisher operator interceptor for each operator created ( Flux or Mono). The passed function is applied to the original operator Publisher and can return a different Publisher, on the condition that it generically maintains the same data type as the original. Use of the Flux/ Mono APIs is discouraged as it will recursively call this hook, leading to StackOverflowError.

Note that sub-hooks are cumulative. Invoking this method twice with the same key will replace the old sub-hook with that name, but keep the execution order (eg. A-h1, B-h2, A-h3 will keep A-B execution order, leading to hooks h3 then h2 being executed). Removing a particular key using #resetOnEachOperator(String) then adding it back will result in the execution order changing (the later sub-hook being executed last). Can be fully reset via #resetOnEachOperator().

This pointcut function cannot make use of Flux, Mono or ParallelFlux APIs as it would lead to a recursive call to the hook: the operator calls would effectively invoke onEachOperator from onEachOperator.
[中]为创建的每个操作符(Flux或Mono)添加或替换命名的发布者操作符拦截器。传递的函数应用于原始运算符发布服务器,并且可以返回不同的发布服务器,条件是它通常保持与原始服务器相同的数据类型。不鼓励使用Flux/MonoAPI,因为它会递归调用此钩子,从而导致StackOverflowerError。
请注意,子挂钩是累积的。使用同一个键调用此方法两次将用该名称替换旧的子钩子,但保留执行顺序(例如,A-h1、B-h2、A-h3将保留A-B执行顺序,导致钩子h3然后执行h2)。使用#resetOnEachOperator(String)删除一个特定的键,然后将其添加回去,将导致执行顺序发生变化(最后执行的是后面的子钩子)。可通过#resetOnEachOperator()完全重置。
这个切入点函数不能使用Flux、Mono或ParallelFlux API,因为它会导致对钩子的递归调用:操作符调用将有效地从onEachOperator调用onEachOperator。

代码示例

代码示例来源:origin: reactor/reactor-core

  1. /**
  2. * Add a {@link Publisher} operator interceptor for each operator created
  3. * ({@link Flux} or {@link Mono}). The passed function is applied to the original
  4. * operator {@link Publisher} and can return a different {@link Publisher},
  5. * on the condition that it generically maintains the same data type as the original.
  6. * Use of the {@link Flux}/{@link Mono} APIs is discouraged as it will recursively
  7. * call this hook, leading to {@link StackOverflowError}.
  8. * <p>
  9. * Note that sub-hooks are cumulative, but invoking this method twice with the same instance
  10. * (or any instance that has the same `toString`) will result in only a single instance
  11. * being applied. See {@link #onEachOperator(String, Function)} for a variant that
  12. * allows you to name the sub-hooks (and thus replace them or remove them individually
  13. * later on). Can be fully reset via {@link #resetOnEachOperator()}.
  14. * <p>
  15. * This pointcut function cannot make use of {@link Flux}, {@link Mono} or
  16. * {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
  17. * operator calls would effectively invoke onEachOperator from onEachOperator.
  18. *
  19. * @param onEachOperator the sub-hook: a function to intercept each operation call
  20. * (e.g. {@code map (fn)} and {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
  21. *
  22. * @see #onEachOperator(String, Function)
  23. * @see #resetOnEachOperator(String)
  24. * @see #resetOnEachOperator()
  25. * @see #onLastOperator(Function)
  26. */
  27. public static void onEachOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onEachOperator) {
  28. onEachOperator(onEachOperator.toString(), onEachOperator);
  29. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorSameLambdaDifferentNamesAppliedTwice() {
  3. AtomicInteger applied = new AtomicInteger();
  4. Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> {
  5. applied.incrementAndGet();
  6. return p;
  7. };
  8. Hooks.onEachOperator(hook);
  9. Hooks.onEachOperator("other", hook);
  10. Hooks.onEachOperatorHook.apply(s -> {});
  11. assertThat(applied.get()).isEqualTo(2);
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorSameLambdaSameNameAppliedOnce() {
  3. AtomicInteger applied = new AtomicInteger();
  4. Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> {
  5. applied.incrementAndGet();
  6. return p;
  7. };
  8. Hooks.onEachOperator(hook);
  9. Hooks.onEachOperator(hook);
  10. Hooks.onEachOperatorHook.apply(s -> {});
  11. assertThat(applied.get()).isEqualTo(1);
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorOneHookNoComposite() {
  3. Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> p;
  4. Hooks.onEachOperator(hook);
  5. assertThat(Hooks.onEachOperatorHook).isSameAs(hook);
  6. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorResetSpecific() {
  3. List<String> applied = new ArrayList<>(3);
  4. Function<? super Publisher<Object>, ? extends Publisher<Object>> hook1 = p -> {
  5. applied.add("h1");
  6. return p;
  7. };
  8. Function<? super Publisher<Object>, ? extends Publisher<Object>> hook2 = p -> {
  9. applied.add("h2");
  10. return p;
  11. };
  12. Hooks.onEachOperator("1", hook1);
  13. Hooks.onEachOperator(hook2);
  14. Hooks.onEachOperatorHook.apply(s -> {});
  15. assertThat(Hooks.getOnEachOperatorHooks()).hasSize(2);
  16. assertThat(applied).containsExactly("h1", "h2");
  17. applied.clear();
  18. Hooks.resetOnEachOperator("1");
  19. Hooks.onEachOperatorHook.apply(s -> {});
  20. assertThat(Hooks.getOnEachOperatorHooks()).hasSize(1);
  21. assertThat(applied).containsExactly("h2");
  22. }

代码示例来源:origin: reactor/reactor-core

  1. };
  2. Hooks.onEachOperator("1", hook1);
  3. Hooks.onEachOperator("2", hook2);
  4. Hooks.onEachOperatorHook.apply(s -> {});
  5. Hooks.onEachOperator("1", hook3);
  6. Hooks.onEachOperatorHook.apply(s -> {});

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorReset() {
  3. Hooks.onEachOperator("some", p -> p);
  4. assertThat(Hooks.onEachOperatorHook).isNotNull();
  5. assertThat(Hooks.getOnEachOperatorHooks()).hasSize(1);
  6. Hooks.resetOnEachOperator();
  7. assertThat(Hooks.onEachOperatorHook).isNull();
  8. assertThat(Hooks.getOnEachOperatorHooks()).isEmpty();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void onEachOperatorClearByName() {
  3. Hooks.onEachOperator("some", p -> p);
  4. Hooks.onEachOperator("other", p -> p);
  5. assertThat(Hooks.onEachOperatorHook).isNotNull();
  6. assertThat(Hooks.getOnEachOperatorHooks()).hasSize(2);
  7. Hooks.resetOnEachOperator("some");
  8. assertThat(Hooks.onEachOperatorHook).isNotNull();
  9. assertThat(Hooks.getOnEachOperatorHooks())
  10. .hasSize(1)
  11. .containsOnlyKeys("other");
  12. Hooks.resetOnEachOperator("other");
  13. assertThat(Hooks.onEachOperatorHook).isNull();
  14. assertThat(Hooks.getOnEachOperatorHooks()).isEmpty();
  15. }

代码示例来源:origin: reactor/reactor-core

  1. Hooks.onEachOperator(h -> {
  2. Flux<Object> publisher = TestPublisher.create()
  3. .flux();
  4. return publisher;
  5. });
  6. Hooks.onEachOperator(h -> {
  7. hook2.set(h);
  8. return h;

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void verboseExtension() {
  3. Queue<String> q = new LinkedTransferQueue<>();
  4. Hooks.onEachOperator(p -> {
  5. q.offer(p.toString());
  6. return p;
  7. Hooks.resetOnEachOperator();
  8. Hooks.onEachOperator(p -> {
  9. q.offer(p.toString());
  10. return p;

代码示例来源:origin: reactor/reactor-core

  1. Hooks.onOperatorDebug();
  2. Hooks.onEachOperator(p -> {
  3. System.out.println(Scannable.from(p).stepName());
  4. return p;

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void eachOperatorTest() {
  3. Hooks.onEachOperator(Operators.lift((sc, sub) ->
  4. new CoreSubscriber<Object>(){
  5. @Override

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void eachOperatorTest() {
  3. Hooks.onEachOperator(Operators.lift((sc, sub) ->
  4. new CoreSubscriber<Object>(){
  5. @Override

代码示例来源:origin: io.projectreactor/reactor-core

  1. /**
  2. * Add a {@link Publisher} operator interceptor for each operator created
  3. * ({@link Flux} or {@link Mono}). The passed function is applied to the original
  4. * operator {@link Publisher} and can return a different {@link Publisher},
  5. * on the condition that it generically maintains the same data type as the original.
  6. * Use of the {@link Flux}/{@link Mono} APIs is discouraged as it will recursively
  7. * call this hook, leading to {@link StackOverflowError}.
  8. * <p>
  9. * Note that sub-hooks are cumulative, but invoking this method twice with the same instance
  10. * (or any instance that has the same `toString`) will result in only a single instance
  11. * being applied. See {@link #onEachOperator(String, Function)} for a variant that
  12. * allows you to name the sub-hooks (and thus replace them or remove them individually
  13. * later on). Can be fully reset via {@link #resetOnEachOperator()}.
  14. * <p>
  15. * This pointcut function cannot make use of {@link Flux}, {@link Mono} or
  16. * {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
  17. * operator calls would effectively invoke onEachOperator from onEachOperator.
  18. *
  19. * @param onEachOperator the sub-hook: a function to intercept each operation call
  20. * (e.g. {@code map (fn)} and {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
  21. *
  22. * @see #onEachOperator(String, Function)
  23. * @see #resetOnEachOperator(String)
  24. * @see #resetOnEachOperator()
  25. * @see #onLastOperator(Function)
  26. */
  27. public static void onEachOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onEachOperator) {
  28. onEachOperator(onEachOperator.toString(), onEachOperator);
  29. }

相关文章