reactor.core.publisher.Hooks.onNextDropped()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(11.9k)|赞(0)|评价(0)|浏览(170)

本文整理了Java中reactor.core.publisher.Hooks.onNextDropped()方法的一些代码示例,展示了Hooks.onNextDropped()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.onNextDropped()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:onNextDropped

Hooks.onNextDropped介绍

[英]Override global data dropped strategy which by default logs at DEBUG level.

The hook is cumulative, so calling this method several times will set up the hook for as many consumer invocations (even if called with the same consumer instance).
[中]覆盖默认情况下在调试级别记录的全局数据丢弃策略。
钩子是累积的,因此多次调用此方法将为尽可能多的使用者调用设置钩子(即使使用相同的使用者实例调用)。

代码示例

代码示例来源:origin: reactor/reactor-core

private void plugHooks() {
  Hooks.onErrorDropped(droppedErrors::offer);
  Hooks.onNextDropped(droppedElements::offer);
  Hooks.onOperatorError((t, d) -> {
    operatorErrors.offer(Tuples.of(Optional.ofNullable(t), Optional.ofNullable(d)));
    return t;
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void onNextDroppedFailReplaces() {
    AtomicReference<Object> dropHook = new AtomicReference<>();
    Publisher<Integer> p = s -> {
      s.onSubscribe(Operators.emptySubscription());
      s.onNext(1);
      s.onNext(2);
      s.onNext(3);
    };
    List<Integer> seen = new ArrayList<>();

    try {
      Hooks.onNextDropped(dropHook::set);
      Hooks.onNextDroppedFail();

      assertThatExceptionOfType(RuntimeException.class)
          .isThrownBy(() -> Flux.from(p).take(2).subscribe(seen::add))
          .isInstanceOf(RuntimeException.class)
          .matches(Exceptions::isCancel);

      assertThat(seen).containsExactly(1, 2);
      assertThat(dropHook.get()).isNull();
    }
    finally {
      Hooks.resetOnNextDropped();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@SuppressWarnings("unchecked")
void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
  FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
                                  .findFirst()
                                  .get();
  assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
  assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
  Hooks.onNextDropped(v -> {
  });
  s.onNext(0);
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, Signal<String>>> scenarios_errorFromUpstreamFailure() {
  return Arrays.asList(
      scenario(Flux::materialize)
          .verifier(step -> {
            Hooks.onErrorDropped(c -> assertThat(c).hasMessage("dropped"));
            Hooks.onNextDropped(c -> assertThat(c).isEqualTo("dropped"));
            step.assertNext(s -> assertThat(s
                .getThrowable()).hasMessage("test"))
              .verifyComplete();
          })
  );
}

代码示例来源:origin: reactor/reactor-core

@Before
public void setUp() {
  scheduler = new BoundedScheduler(Schedulers.newSingle("bounded-single"));
  Hooks.onNextDropped(o -> onNextDropped.add(o));
  Hooks.onErrorDropped(e -> onErrorDropped.add(e));
  Hooks.onOperatorError((e, o) -> {
    onOperatorError.add(e);
    if (o instanceof Long)
      onOperatorErrorData.add((Long) o);
    else if (o != null) {
      System.out.println(o);
    }
    return e;
  });
  Schedulers.onHandleError((thread, t) -> onSchedulerHandleError.add(t));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDrop() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropErrorHookFails() {
  AtomicReference<Object> value = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  Hooks.onNextDropped(value::set);
  Hooks.onErrorDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("error hook")
        .hasSuppressedException(exception);
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfWithFatalMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NoSuchMethodError);
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get())
        .isInstanceOf(NoSuchMethodError.class)
        .hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pollErrorModeLocalStrategy() {
  List<Object> nextDropped = new ArrayList<>();
  List<Object> errorDropped = new ArrayList<>();
  Hooks.onNextDropped(nextDropped::add);
  Hooks.onErrorDropped(errorDropped::add);
  Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  Exception error = new IllegalStateException("boom");
  try {
    assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
    RuntimeException e = Operators.onNextPollError("foo", error, c);
    assertThat(e).isNull();
    assertThat(nextDropped).containsExactly("foo");
    assertThat(errorDropped).containsExactly(error);
  }
  finally {
    Hooks.resetOnNextDropped();
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropValueHookFails() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("value hook");
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("value hook")
        .hasSuppressedException(exception);
    assertThat(error.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropWithFatal() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    assertThatExceptionOfType(NoSuchMethodError.class)
        .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTake() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onNext(1);
  })
              .take(2))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfWithFatalNoMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NoSuchMethodError("foo");
    assertThat(strategy.test(exception, data)).isFalse();
    assertThatExceptionOfType(NoSuchMethodError.class)
        .isThrownBy(() -> strategy.process(exception, data, Context.empty()));
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfNoMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof IllegalArgumentException);
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isFalse();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .isSameAs(exception)
        .hasNoSuppressedExceptions();
    assertThat(error.get()).isNull();
    assertThat(value.get()).isNull();
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTakeConditional() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onNext(1);
  })
              .take(2)
              .filter("test2"::equals))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test // fixme when we have a fuseable testPublisher or an improved hide operator
@SuppressWarnings("unchecked")
public void failNextIfTerminatedTakeSourceConditional() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    ((Fuseable.ConditionalSubscriber)s).tryOnNext(1);
  })
              .take(2)
              .filter("test2"::equals))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTakeFused() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(up.take(2))
        .then(() -> up.actual.onComplete())
        .then(() -> up.actual.onNext(1))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failNextOnTerminated() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  Hooks.onNextDropped(c -> {
    assertThat(c).isEqualTo(2);
  });
  StepVerifier.create(up.flatMap(Flux::just))
        .then(() -> {
          up.onNext(1);
          CoreSubscriber<? super Integer> a = up.actual;
          up.onComplete();
          a.onNext(2);
        })
        .expectNext(1)
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleNext() {
  Hooks.onNextDropped(c -> {
  });
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.just(2), s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext(2);
    s.onNext(3);
  }))
        .thenCancel()
        .verify();
  Hooks.resetOnNextDropped();
}

相关文章