java Spring Webclient Flux在保持flux的同时可以启动并忘记

hts6caw3  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(173)

我想要达到的是

  • 对于MyPojo Flux<MyPojo>的Flux中的每个元素,发送一个http请求,并将json payload发送给第三方web service,fireandforget风格,同时能够返回Flux进行后续处理。

新闻资讯

  • 第三方Web服务器,我无法控制,需要5秒的恒定时间来处理一个请求。
  • 第三方服务器不提供任何批量或列表API,它是一个接一个
  • 第三方服务器是众所周知的是非常可靠的,我被允许打击它尽可能努力
  • 我不太关心响应,特别是,等待5秒的响应是没有价值的,我什么也不做。

我所尝试的

  • 使用此代码:
private Flux<MyPojo> sendMyPojoFireAndForget(Flux<MyPojo> myPojoFlux) {
        return myPojoFlux.flatMap(oneMyPojo -> webClient.post().uri("http://example.com/path").bodyValue(oneMyPojo).exchangeToMono(clientResponse -> Mono.just(oneMyPojo)));
    }

但是,这不是火和忘记。在我这边有适当的日志,看到一些来自第三方的日志,似乎我在等待回应。
我也试过

webClient.post()
            .uri("http://example.com/path")
            .bodyValue(oneMyPojo)
            .retrieve()
            .bodyToMono(Void.class)
            .subscribe();

但这会让我“失去”MyPojo之流,最终得到一个虚空之流
问题:

  • 如何将Flux的每个元素发送到这个外部Web服务器,同时保留整个flux用于下游处理?
vulvrdjw

vulvrdjw1#

您可以将多个侦听器添加到一个观察对象(例如Mono或Flux):

@Test
void test() {
    Flux<MyPojo> myPojoFlux = Flux.just(
            new MyPojo(1),
            new MyPojo(2),
            new MyPojo(3),
            new MyPojo(4),
            new MyPojo(5)
    );

    myPojoFlux.subscribe(this::sendPostRequest);
    myPojoFlux.subscribe(this::doSomethingElse);

}

private void doSomethingElse(MyPojo myPojo) {
    System.out.println(myPojo);
}

private void sendPostRequest(MyPojo oneMyPojo) {
    webClient.get()
            .uri(...)
            .bodyValue(oneMyPojo)
            .retrieve()
            .bodyToMono(String.class)
            .subscribe(resp -> System.out.println(oneMyPojo + " " + resp));
}

由于如果不执行block,客户端将异步工作,因此无论sendPostRequest的响应如何,都会调用doSomethingElse方法

相关问题