本文整理了Java中reactor.core.publisher.Mono.create()
方法的一些代码示例,展示了Mono.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.create()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:create
[英]Creates a deferred emitter that can be used with callback-based APIs to signal at most one value, a complete or an error signal.
Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.
Mono.<String>create(sink -> {
HttpListener listener = event -> {
if (event.getResponseCode() >= 400) {
sink.error(new RuntimeException("Failed"));
} else {
String body = event.getBody();
if (body.isEmpty()) {
sink.success();
} else {
sink.success(body.toLowerCase());
}
}
};
client.addListener(listener);
sink.onDispose(() -> client.removeListener(listener));
});
Note that this works only with single-value emitting listeners. Otherwise, all subsequent signals are dropped. You may have to add client.removeListener(this);to the listener's body.
Mono.<String>create(sink -> {
Callback<String> callback = new Callback<String>() {
@Override
public void onResult(String data) {
sink.success(data.toLowerCase());
}
@Override
public void onError(Exception e) {
sink.error(e);
}
}
// without cancellation support:
client.call("query", callback);
// with cancellation support:
AutoCloseable cancel = client.call("query", callback);
sink.onDispose(() -> {
try {
cancel.close();
} catch (Exception ex) {
Exceptions.onErrorDropped(ex);
}
});
});
[中]创建一个延迟发射器,该发射器可与基于回调的API一起使用,以发出最多一个值、完整信号或错误信号。
由于缺乏标准类型和方法,桥接遗留API主要涉及样板代码。有两种API表面:1)addListener/RemovelListener和2)回调处理程序。
1) addListener/RemovelListener对
要使用此类API,必须实例化侦听器,从侦听器调用接收器,然后将其注册到源:
Mono.<String>create(sink -> {
HttpListener listener = event -> {
if (event.getResponseCode() >= 400) {
sink.error(new RuntimeException("Failed"));
} else {
String body = event.getBody();
if (body.isEmpty()) {
sink.success();
} else {
sink.success(body.toLowerCase());
}
}
};
client.addListener(listener);
sink.onDispose(() -> client.removeListener(listener));
});
请注意,这仅适用于发出单值的侦听器。否则,将丢弃所有后续信号。您可能需要添加客户端。removeListener(本);到听者的身体。
2) 回调处理程序
这需要类似于上述的实例化模式,但通常成功的完成和错误被分为不同的方法。此外,遗留API可能支持也可能不支持某些取消机制。
Mono.<String>create(sink -> {
Callback<String> callback = new Callback<String>() {
@Override
public void onResult(String data) {
sink.success(data.toLowerCase());
}
@Override
public void onError(Exception e) {
sink.error(e);
}
}
// without cancellation support:
client.call("query", callback);
// with cancellation support:
AutoCloseable cancel = client.call("query", callback);
sink.onDispose(() -> {
try {
cancel.close();
} catch (Exception ex) {
Exceptions.onErrorDropped(ex);
}
});
});
代码示例来源:origin: codecentric/spring-boot-admin
private Mono<Void> writeAndFlush(Flux<DataBuffer> body, OutputStream responseBody) {
return DataBufferUtils.write(body, responseBody).map(DataBufferUtils::release).then(Mono.create(sink -> {
try {
responseBody.flush();
sink.success();
} catch (IOException ex) {
sink.error(ex);
}
}));
}
}
代码示例来源:origin: AxonFramework/AxonFramework
/**
* Creates the wrapper by creating the delegate mono.
*
* @param callback Gets the wrapper around {@link reactor.core.publisher.MonoSink} which can be used to complete the
* mono
* @param <T> The type of data sent via this mono
* @return instance of created wrapper
*/
public static <T> MonoWrapper<T> create(Consumer<MonoSinkWrapper<T>> callback) {
return new MonoWrapper<>(Mono.create(monoSink -> callback.accept(new MonoSinkWrapper<>(monoSink))));
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoCreateOnCancel() {
AtomicBoolean cancelled = new AtomicBoolean();
Mono.create(s -> s.onCancel(() -> cancelled.set(true)).success("test")).block();
assertThat(cancelled.get()).isFalse();
Mono.create(s -> s.onCancel(() -> cancelled.set(true)).success()).block();
assertThat(cancelled.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createStreamFromMonoCreateError() {
AtomicInteger onDispose = new AtomicInteger();
AtomicInteger onCancel = new AtomicInteger();
StepVerifier.create(Mono.create(s -> {
s.onDispose(onDispose::getAndIncrement)
.onCancel(onCancel::getAndIncrement)
.error(new Exception("test"));
}))
.verifyErrorMessage("test");
assertThat(onDispose.get()).isEqualTo(1);
assertThat(onCancel.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
Mono<String> exponentialRetryScenario() {
AtomicInteger i = new AtomicInteger();
return Mono.<String>create(s -> {
if (i.incrementAndGet() == 4) {
s.success("hey");
}
else {
s.error(new RuntimeException("test " + i));
}
}).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}
代码示例来源:origin: reactor/reactor-core
Flux<String> exponentialRepeatScenario2() {
AtomicInteger i = new AtomicInteger();
return Mono.<String>create(s -> {
if (i.incrementAndGet() == 4) {
s.success("hey");
}
else {
s.success();
}
}).repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiSuccessAfterErrorIsIgnored() {
Mono<String> secondIsValuedSuccess = Mono.create(sink -> {
sink.error(new IllegalArgumentException("boom"));
sink.success("bar");
});
StepVerifier.create(secondIsValuedSuccess)
.verifyErrorMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiEmptySuccessAfterErrorIsIgnored() {
Mono<String> secondIsEmptySuccess = Mono.create(sink -> {
sink.error(new IllegalArgumentException("boom"));
sink.success();
});
StepVerifier.create(secondIsEmptySuccess)
.verifyErrorMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkToString() {
StepVerifier.create(Mono.create(sink -> sink.success(sink.toString())))
.expectNext("MonoSink")
.verifyComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiErrorAfterErrorBubblesAndDrops() {
Mono<String> secondIsError = Mono.create(sink -> {
sink.error(new IllegalArgumentException("boom1"));
sink.error(new IllegalArgumentException("boom2"));
});
StepVerifier.create(secondIsError)
.expectErrorMessage("boom1")
.verifyThenAssertThat()
.hasOperatorErrorWithMessage("boom2");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextTest() {
StepVerifier.create(Mono.create(s -> s.success(s.currentContext()
.get(AtomicInteger.class)
.incrementAndGet()))
.subscriberContext(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiEmptySuccessAfterEmptySuccessIsIgnored() {
Mono<String> secondIsEmptySuccess = Mono.create(sink -> {
sink.success();
sink.success();
});
StepVerifier.create(secondIsEmptySuccess)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiErrorAfterEmptySuccessBubblesAndDrops() {
Mono<String> secondIsError = Mono.create(sink -> {
sink.success();
sink.error(new IllegalArgumentException("boom"));
});
StepVerifier.create(secondIsError)
.expectComplete()
.verifyThenAssertThat()
.hasOperatorErrorWithMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createStreamFromMonoCreate2() {
StepVerifier.create(Mono.create(MonoSink::success)
.publishOn(Schedulers.parallel()))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiSuccessAfterEmptySuccessIsIgnored() {
Mono<String> secondIsValuedSuccess = Mono.create(sink -> {
sink.success();
sink.success("foo");
});
StepVerifier.create(secondIsValuedSuccess)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiErrorAfterSuccessBubblesAndDrops() {
Mono<String> secondIsError = Mono.create(sink -> {
sink.success("foo");
sink.error(new IllegalArgumentException("boom"));
});
StepVerifier.create(secondIsError)
.expectNext("foo")
.expectComplete()
.verifyThenAssertThat()
.hasOperatorErrorWithMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createStreamFromMonoCreateHide() {
StepVerifier.create(Mono.create(s -> s.success("test1")).hide())
.expectNext("test1")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiEmptySuccessAfterSuccessIsIgnored() {
Mono<String> secondIsEmptySuccess = Mono.create(sink -> {
sink.success("foo");
sink.success();
});
StepVerifier.create(secondIsEmptySuccess)
.expectNext("foo")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sinkApiSuccessAfterSuccessIsIgnored() {
Mono<String> secondIsValuedSuccess = Mono.create(sink -> {
sink.success("foo");
sink.success("bar");
});
StepVerifier.create(secondIsValuedSuccess)
.expectNext("foo")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void predicateThrows() {
AssertSubscriber<Object> ts = AssertSubscriber.create(2);
Mono.create(s -> s.success(1))
.filter(v -> {
throw new RuntimeException("forced failure");
})
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
内容来源于网络,如有侵权,请联系作者删除!