如何度量webflux webclient方法的执行时间?

vd2z7a6w  于 2021-07-13  发布在  Java
关注(0)|答案(4)|浏览(510)

我准备了一堆请求,希望将这些请求与外部webservice并行发送。在这个流程中,我将继续直接处理响应(例如将某些内容插入数据库)。
问题:我想跟踪最大请求时间(对于一个请求!),不包括处理。但如前所述,这将只跟踪全局时间,包括任何子进程:

StopWatch watch = new StopWatch();
watch.start();

Flux.fromIterable(requests)
    .flatMap(req -> webClient.send(req, MyResponse.class)
            .doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
    .collectList()
    .block();

watch.stop();
System.out.println(w.getTotalTimeMillis());

问题:如何衡量请求所用的最长时间(不包括 processResponse() 时间?

yqkkidmi

yqkkidmi1#

在mono上使用elapsed时,您将得到一个元组的mono,其中包含elapsed时间和原始对象。你得把它们拆开才能用。我在一个测试中编写了一个示例(代码有点简化),以查看它的工作情况:

@Test
public void elapsed() {

    Flux.fromIterable(List.of(1, 2, 3, 4, 5))
        .flatMap(req -> Mono.delay(Duration.ofMillis(100L * req))
                            .map(it -> "response_" + req)
                            .elapsed()
                            .doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
                            .map(Tuple2::getT2)
                            .doOnSuccess(rsp -> processResponse(rsp)))
        .collectList()
        .block();

}

@SneakyThrows
public void processResponse(Object it) {
    System.out.println("This is the response: " + it);
    Thread.sleep(1000);
}

输出如下所示:

I took 112 MS
This is the response: response_1
I took 205 MS
This is the response: response_2
I took 305 MS
This is the response: response_3
I took 403 MS
This is the response: response_4
I took 504 MS
This is the response: response_5

这些数字表示延迟(在您的示例中是webclient.send())和React管道本身的一点开销。它是在订阅(特定请求的flatmap运行时发生)和下一个信号(在我的情况下是map的结果,在你的情况下是webclient请求的结果)之间计算的
您的代码如下所示:

Flux.fromIterable(requests)
        .flatMap(req -> webClient.send(req, MyResponse.class)
                                 .elapsed()
                                 .doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
                                 .map(Tuple2::getT2)
                                 .doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
        .collectList()
        .block();

注意:如果您想使用秒表代替,也可以通过如下操作:

Flux.fromIterable(List.of(1, 2, 3, 4, 5)).flatMap(req -> {
            StopWatch stopWatch = new StopWatch();
            return Mono.fromRunnable(stopWatch::start)
                       .then(Mono.delay(Duration.ofMillis(100L * req)).map(it -> "response_" + req).doOnNext(it -> {
                           stopWatch.stop();
                           System.out.println("I took " + stopWatch.getTime() + " MS");
                       }).doOnSuccess(this::processResponse));
        }).collectList().block();

但就我个人而言,我建议使用.elapsed()解决方案,因为它更干净一些。

new9mtju

new9mtju2#

我会用那种方法直接避开秒表。而是创建一个可以在其他地方使用的度量 Package 器。
你可以利用 .doOnSubscribe(), .doOnError(), .doOnSuccess() 但要回答你的问题,你可以有一个这样的计时器

public sendRequest(){
                Flux.fromIterable(requests)
                .flatMap(req -> webClient.send(req, MyResponse.class)
                        .transform(timerPublisher("time took for ", req.id)))
                .collectList()
                .block();
   }

//this can be made sophisticated by determining what kind of publisher it is
//mono or flux
    private Function<Mono<T>, Publisher<T>> timerPublisher(String metric) {

        StopWatchHelper stopWatch = new StopWatchHelper(metric);
        return s -> s.doOnSubscribe((s) -> stopWatch.start())
                .doOnSuccess(documentRequest -> stopWatch.record())
                .doOnError(stopWatch::record);
    }

    private class StopWatchHelper{
        private StopWatch stopWatch;
        private String metric;
        public StopWatchHelper(String metric){
            this.metric = metric;
            stopWatch = new StopWatch();
        }
        public Consumer<Subscription> start() {
            return (s) -> stopWatch.start();
        }

        public void record(){
            if(stopWatch.isStarted()){
                System.out.println(String.format("Metric %s took %s", metric, stopWatch.getTime()));
            }
        }

        public void record(Throwable t){
            if(stopWatch.isStarted()){
                System.out.println(String.format("Metric %s took %s, reported in error %s", metric, stopWatch.getTime(),throwable));
            }
        }
    }

PS: Avoid using .block() -> it beats the purpose :)
u5i3ibmn

u5i3ibmn3#

springboot提供了一个开箱即用的特性,它将为您的应用程序添加插装 WebClient .
您可以使用自动配置的 WebClient.Builder 创建您的 WebClient 示例ie。

@Bean
public WebClient myCustomWebClient(WebClient.Builder builder) {
    return builder
            // your custom web client config code
            .build();
}

此工具将为您的用户所做的每个api调用计时 WebClient 在你的电脑里注册 MeterRegistry 参考文件

ycl3bljg

ycl3bljg4#

一种选择是使用单元测试和 Mockito 模仿方法的行为 processResponse() . 然后你只测量其他任务的时间。假设在类中有以下方法:

public class AnotherService {
    public Object processResponse(Object response) {
        try {
            System.out.println("processResponse called");
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return response;
    }
}

然后你用 when 方法并模拟返回。在这里你已经摆脱了时间 Thread.sleep(20000);processResponse .

Object sample = new Object(); // your return to sumulate
when(anotherService.processResponse(any())).thenReturn(sample);

要在单元测试中使用它,它将如下所示:

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class GitHubJobsClientMockTest {

    @Mock
    private AnotherService anotherService;
    @InjectMocks
    private YourService yourService;

    void stackoverflowRequest() {

        Object sample = new Object();

        // HERE YOU CREATE THE MOCK OF YOUR METHOD FROM AnotherService
        when(anotherService.processResponse(any())).thenReturn(sample);

        List<Integer> pageNumbers = List.of(1, 2, 3);
        String description = "Java";

        List<Stream<Object>> result = yourService.stackoverflowRequest(pageNumbers, description);

        assertTrue(result.size() > 0);
    }
}

所以当你测试你的 YourService 这个 stopWatch.start(); 以及 stopWatch.stop(); 应该计算总时间-模拟方法的时间 anotherService.processResponse(response)) .

public class YourService {
    public List<Stream<Object>> stackoverflowRequest(List<Integer> requests, String description) {
        stopWatch.start();

        List<Stream<GitHubPosition>> result = Flux.fromIterable(requests)
                .map(pageNumber -> invokeGithubJobsApi(pageNumber, description))
                .map(gitHubPositionList ->
                    gitHubPositionList
                            .stream()
                            //               THIS METHOD YOU HAVE TO MOCK
                            .map(response -> anotherService.processResponse(response))
                )
                .collectList()
                .block();

        stopWatch.stop();
        log.info("time elapsed: " + stopWatch.getTime());
        return result;
    }

相关问题