本文整理了Java中reactor.core.publisher.Hooks.resetOnErrorDropped()
方法的一些代码示例,展示了Hooks.resetOnErrorDropped()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.resetOnErrorDropped()
方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:resetOnErrorDropped
[英]Reset global error dropped strategy to bubbling back the error.
[中]
代码示例来源:origin: reactor/reactor-core
public void unplugHooks() {
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
Hooks.resetOnOperatorError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeHookErrorDropped() {
Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("complete")));
try {
Mono.just("foo")
.subscribe(v -> {},
e -> {},
() -> { throw new IllegalStateException("complete");});
}
finally {
Hooks.resetOnErrorDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void validationNull() {
Hooks.onErrorDropped(e -> assertThat(e).isInstanceOf(NullPointerException.class)
.hasMessage("next is null"));
try {
assertThat(OperatorDisposables.validate(null, null,
e -> Operators.onErrorDropped(e, Context.empty()))).isFalse();
} finally {
Hooks.resetOnErrorDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@After
public void tearDown() {
scheduler.dispose();
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
Hooks.resetOnOperatorError();
Schedulers.resetOnHandleError();
onNexts.clear();
onErrors.clear();
onNextDropped.clear();
onErrorDropped.clear();
onOperatorError.clear();
onOperatorErrorData.clear();
onSchedulerHandleError.clear();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void resumeDropIfMatch() {
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicReference<Object> value = new AtomicReference<>();
Hooks.onErrorDropped(error::set);
Hooks.onNextDropped(value::set);
try {
OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
e -> e instanceof NullPointerException);
String data = "foo";
Throwable exception = new NullPointerException("foo");
assertThat(strategy.test(exception, data)).isTrue();
Throwable t = strategy.process(exception, data, Context.empty());
assertThat(t).isNull();
assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
assertThat(value.get()).isEqualTo("foo");
}
finally {
Hooks.resetOnErrorDropped();
Hooks.resetOnNextDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void resumeDrop() {
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicReference<Object> value = new AtomicReference<>();
Hooks.onErrorDropped(error::set);
Hooks.onNextDropped(value::set);
try {
OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
String data = "foo";
Throwable exception = new NullPointerException("foo");
assertThat(strategy.test(exception, data)).isTrue();
Throwable t = strategy.process(exception, data, Context.empty());
assertThat(t).isNull();
assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
assertThat(value.get()).isEqualTo("foo");
}
finally {
Hooks.resetOnErrorDropped();
Hooks.resetOnNextDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void resumeDropErrorHookFails() {
AtomicReference<Object> value = new AtomicReference<>();
UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
Hooks.onNextDropped(value::set);
Hooks.onErrorDropped(v -> { throw failure; });
try {
OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
String data = "foo";
Throwable exception = new NullPointerException("foo");
Throwable t = strategy.process(exception, data, Context.empty());
assertThat(t)
.hasMessage("error hook")
.hasSuppressedException(exception);
assertThat(value.get()).isEqualTo("foo");
}
finally {
Hooks.resetOnErrorDropped();
Hooks.resetOnNextDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
Hooks.onErrorDropped(e -> {
assertThat(e).hasMessage("test2");
});
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(new Exception("test"));
s.onError(new Exception("test2"));
}).flatMap(Flux::just))
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void pollErrorModeLocalStrategy() {
List<Object> nextDropped = new ArrayList<>();
List<Object> errorDropped = new ArrayList<>();
Hooks.onNextDropped(nextDropped::add);
Hooks.onErrorDropped(errorDropped::add);
Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
Exception error = new IllegalStateException("boom");
try {
assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
RuntimeException e = Operators.onNextPollError("foo", error, c);
assertThat(e).isNull();
assertThat(nextDropped).containsExactly("foo");
assertThat(errorDropped).containsExactly(error);
}
finally {
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
}
}
代码示例来源:origin: reactor/reactor-core
final void resetHooks() {
Hooks.resetOnErrorDropped();
Hooks.resetOnNextDropped();
Hooks.resetOnEachOperator();
Hooks.resetOnOperatorError();
Hooks.resetOnLastOperator();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void failOverflowScalarThenError() {
AtomicBoolean set = new AtomicBoolean();
Hooks.onErrorDropped(e -> {
assertThat(Exceptions.isOverflow(e)).isTrue();
set.set(true);
});
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext(1);
Exceptions.terminate(FluxFlatMap.FlatMapMain.ERROR, (FluxFlatMap.FlatMapMain) s);
s.onNext(2);
((FluxFlatMap.FlatMapMain)s).error = null;
s.onError(new Exception("test"));
})
.flatMap(Flux::just, 1), 0)
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
assertThat(set.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void failOverflowWhileActiveScalarThenError() {
AtomicBoolean set = new AtomicBoolean();
Hooks.onErrorDropped(e -> {
assertThat(Exceptions.isOverflow(e)).isTrue();
set.set(true);
});
StepVerifier.create(Flux.from(s -> {
s.onSubscribe(Operators.emptySubscription());
s.onNext(1);
Exceptions.terminate(FluxFlatMap.FlatMapMain.ERROR, (FluxFlatMap.FlatMapMain) s);
((FluxFlatMap.FlatMapMain)s).wip = 1; //simulate concurrent active
s.onNext(2);
s.onNext(3);
((FluxFlatMap.FlatMapMain)s).error = null;
((FluxFlatMap.FlatMapMain)s).drainLoop();
s.onError(new Exception("test"));
})
.flatMap(Flux::just, 1), 1)
.expectNext(1)
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
assertThat(set.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
Hooks.onErrorDropped(e -> {
});
StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.never(), s -> {
s.onSubscribe(Operators.emptySubscription());
s.onError(new Exception("test"));
s.onError(new Exception("test2"));
}))
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
}
代码示例来源:origin: reactor/reactor-core
@Test
@SuppressWarnings("unchecked")
public void failDoubleError() {
Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
StepVerifier.create(Flux.from(s -> {
assertTrackableBeforeOnSubscribe((InnerOperator)s);
s.onSubscribe(Operators.emptySubscription());
assertTrackableAfterOnSubscribe((InnerOperator)s);
s.onError(new Exception("test"));
assertTrackableAfterOnComplete((InnerOperator)s);
s.onError(new Exception("test2"));
})
.take(2))
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
}
代码示例来源:origin: reactor/reactor-core
@Test
@SuppressWarnings("unchecked")
public void failConditionalDoubleError() {
Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
StepVerifier.create(Flux.from(s -> {
assertTrackableBeforeOnSubscribe((InnerOperator)s);
s.onSubscribe(Operators.emptySubscription());
assertTrackableAfterOnSubscribe((InnerOperator)s);
s.onError(new Exception("test"));
assertTrackableAfterOnComplete((InnerOperator)s);
s.onError(new Exception("test2"));
})
.take(2).filter(d -> true))
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
}
代码示例来源:origin: reactor/reactor-core
@After
public void resetAllHooks() {
Hooks.resetOnOperatorError();
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
Hooks.resetOnOperatorDebug();
Hooks.resetOnEachOperator();
Hooks.resetOnLastOperator();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void failDoubleTerminalPublisher() {
DirectProcessor<Integer> d1 = DirectProcessor.create();
Hooks.onErrorDropped(e -> {
});
try {
StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), d1, s -> {
CoreSubscriber<?> a =
((DirectProcessor.DirectInner) d1.inners().findFirst().get())
.actual;
s.onSubscribe(Operators.emptySubscription());
s.onComplete();
a.onError(new Exception("test"));
}))
.verifyComplete();
}
finally {
Hooks.resetOnErrorDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@After
public void resetAllHooks() {
Hooks.resetOnOperatorError();
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
Hooks.resetOnOperatorDebug();
Hooks.resetOnEachOperator();
Hooks.resetOnLastOperator();
}
代码示例来源:origin: reactor/reactor-core
@Test
@SuppressWarnings("unchecked")
public void failFusedDoubleError() {
UnicastProcessor<Integer> up = UnicastProcessor.create();
Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
StepVerifier.create(up
.take(2))
.consumeSubscriptionWith(s -> {
assertTrackableBeforeOnSubscribe((InnerOperator)s);
})
.then(() -> {
assertTrackableAfterOnSubscribe((InnerOperator)up.actual);
up.actual.onError(new Exception("test"));
assertTrackableAfterOnComplete((InnerOperator)up.actual);
up.actual.onError(new Exception("test2"));
})
.verifyErrorMessage("test");
Hooks.resetOnErrorDropped();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorAfterNextIsNeverTriggered() {
TestPublisher<String> source = TestPublisher.create();
AtomicReference<Throwable> errorDropped = new AtomicReference<>();
Hooks.onErrorDropped(errorDropped::set);
try {
StepVerifier.withVirtualTime(() ->
new MonoDelayElement<>(source.mono(), 2, TimeUnit.SECONDS, defaultSchedulerForDelay()))
.expectSubscription()
.then(() -> source.next("foo").error(new IllegalStateException("boom")))
.expectNoEvent(Duration.ofSeconds(2))
.expectNext("foo")
.verifyComplete();
} finally {
Hooks.resetOnErrorDropped();
}
assertThat(errorDropped.get()).isNull();
}
内容来源于网络,如有侵权,请联系作者删除!