如何度量webflux webclient方法的执行时间?

vd2z7a6w  于 2021-07-13  发布在  Java
关注(0)|答案(4)|浏览(509)

我准备了一堆请求,希望将这些请求与外部webservice并行发送。在这个流程中,我将继续直接处理响应(例如将某些内容插入数据库)。
问题:我想跟踪最大请求时间(对于一个请求!),不包括处理。但如前所述,这将只跟踪全局时间,包括任何子进程:

  1. StopWatch watch = new StopWatch();
  2. watch.start();
  3. Flux.fromIterable(requests)
  4. .flatMap(req -> webClient.send(req, MyResponse.class)
  5. .doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
  6. .collectList()
  7. .block();
  8. watch.stop();
  9. System.out.println(w.getTotalTimeMillis());

问题:如何衡量请求所用的最长时间(不包括 processResponse() 时间?

yqkkidmi

yqkkidmi1#

在mono上使用elapsed时,您将得到一个元组的mono,其中包含elapsed时间和原始对象。你得把它们拆开才能用。我在一个测试中编写了一个示例(代码有点简化),以查看它的工作情况:

  1. @Test
  2. public void elapsed() {
  3. Flux.fromIterable(List.of(1, 2, 3, 4, 5))
  4. .flatMap(req -> Mono.delay(Duration.ofMillis(100L * req))
  5. .map(it -> "response_" + req)
  6. .elapsed()
  7. .doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
  8. .map(Tuple2::getT2)
  9. .doOnSuccess(rsp -> processResponse(rsp)))
  10. .collectList()
  11. .block();
  12. }
  13. @SneakyThrows
  14. public void processResponse(Object it) {
  15. System.out.println("This is the response: " + it);
  16. Thread.sleep(1000);
  17. }

输出如下所示:

  1. I took 112 MS
  2. This is the response: response_1
  3. I took 205 MS
  4. This is the response: response_2
  5. I took 305 MS
  6. This is the response: response_3
  7. I took 403 MS
  8. This is the response: response_4
  9. I took 504 MS
  10. This is the response: response_5

这些数字表示延迟(在您的示例中是webclient.send())和React管道本身的一点开销。它是在订阅(特定请求的flatmap运行时发生)和下一个信号(在我的情况下是map的结果,在你的情况下是webclient请求的结果)之间计算的
您的代码如下所示:

  1. Flux.fromIterable(requests)
  2. .flatMap(req -> webClient.send(req, MyResponse.class)
  3. .elapsed()
  4. .doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
  5. .map(Tuple2::getT2)
  6. .doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
  7. .collectList()
  8. .block();

注意:如果您想使用秒表代替,也可以通过如下操作:

  1. Flux.fromIterable(List.of(1, 2, 3, 4, 5)).flatMap(req -> {
  2. StopWatch stopWatch = new StopWatch();
  3. return Mono.fromRunnable(stopWatch::start)
  4. .then(Mono.delay(Duration.ofMillis(100L * req)).map(it -> "response_" + req).doOnNext(it -> {
  5. stopWatch.stop();
  6. System.out.println("I took " + stopWatch.getTime() + " MS");
  7. }).doOnSuccess(this::processResponse));
  8. }).collectList().block();

但就我个人而言,我建议使用.elapsed()解决方案,因为它更干净一些。

展开查看全部
new9mtju

new9mtju2#

我会用那种方法直接避开秒表。而是创建一个可以在其他地方使用的度量 Package 器。
你可以利用 .doOnSubscribe(), .doOnError(), .doOnSuccess() 但要回答你的问题,你可以有一个这样的计时器

  1. public sendRequest(){
  2. Flux.fromIterable(requests)
  3. .flatMap(req -> webClient.send(req, MyResponse.class)
  4. .transform(timerPublisher("time took for ", req.id)))
  5. .collectList()
  6. .block();
  7. }
  8. //this can be made sophisticated by determining what kind of publisher it is
  9. //mono or flux
  10. private Function<Mono<T>, Publisher<T>> timerPublisher(String metric) {
  11. StopWatchHelper stopWatch = new StopWatchHelper(metric);
  12. return s -> s.doOnSubscribe((s) -> stopWatch.start())
  13. .doOnSuccess(documentRequest -> stopWatch.record())
  14. .doOnError(stopWatch::record);
  15. }
  16. private class StopWatchHelper{
  17. private StopWatch stopWatch;
  18. private String metric;
  19. public StopWatchHelper(String metric){
  20. this.metric = metric;
  21. stopWatch = new StopWatch();
  22. }
  23. public Consumer<Subscription> start() {
  24. return (s) -> stopWatch.start();
  25. }
  26. public void record(){
  27. if(stopWatch.isStarted()){
  28. System.out.println(String.format("Metric %s took %s", metric, stopWatch.getTime()));
  29. }
  30. }
  31. public void record(Throwable t){
  32. if(stopWatch.isStarted()){
  33. System.out.println(String.format("Metric %s took %s, reported in error %s", metric, stopWatch.getTime(),throwable));
  34. }
  35. }
  36. }
  37. PS: Avoid using .block() -> it beats the purpose :)
展开查看全部
u5i3ibmn

u5i3ibmn3#

springboot提供了一个开箱即用的特性,它将为您的应用程序添加插装 WebClient .
您可以使用自动配置的 WebClient.Builder 创建您的 WebClient 示例ie。

  1. @Bean
  2. public WebClient myCustomWebClient(WebClient.Builder builder) {
  3. return builder
  4. // your custom web client config code
  5. .build();
  6. }

此工具将为您的用户所做的每个api调用计时 WebClient 在你的电脑里注册 MeterRegistry 参考文件

ycl3bljg

ycl3bljg4#

一种选择是使用单元测试和 Mockito 模仿方法的行为 processResponse() . 然后你只测量其他任务的时间。假设在类中有以下方法:

  1. public class AnotherService {
  2. public Object processResponse(Object response) {
  3. try {
  4. System.out.println("processResponse called");
  5. Thread.sleep(20000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. return response;
  10. }
  11. }

然后你用 when 方法并模拟返回。在这里你已经摆脱了时间 Thread.sleep(20000);processResponse .

  1. Object sample = new Object(); // your return to sumulate
  2. when(anotherService.processResponse(any())).thenReturn(sample);

要在单元测试中使用它,它将如下所示:

  1. import static org.mockito.ArgumentMatchers.*;
  2. import static org.mockito.Mockito.when;
  3. @ExtendWith(MockitoExtension.class)
  4. class GitHubJobsClientMockTest {
  5. @Mock
  6. private AnotherService anotherService;
  7. @InjectMocks
  8. private YourService yourService;
  9. void stackoverflowRequest() {
  10. Object sample = new Object();
  11. // HERE YOU CREATE THE MOCK OF YOUR METHOD FROM AnotherService
  12. when(anotherService.processResponse(any())).thenReturn(sample);
  13. List<Integer> pageNumbers = List.of(1, 2, 3);
  14. String description = "Java";
  15. List<Stream<Object>> result = yourService.stackoverflowRequest(pageNumbers, description);
  16. assertTrue(result.size() > 0);
  17. }
  18. }

所以当你测试你的 YourService 这个 stopWatch.start(); 以及 stopWatch.stop(); 应该计算总时间-模拟方法的时间 anotherService.processResponse(response)) .

  1. public class YourService {
  2. public List<Stream<Object>> stackoverflowRequest(List<Integer> requests, String description) {
  3. stopWatch.start();
  4. List<Stream<GitHubPosition>> result = Flux.fromIterable(requests)
  5. .map(pageNumber -> invokeGithubJobsApi(pageNumber, description))
  6. .map(gitHubPositionList ->
  7. gitHubPositionList
  8. .stream()
  9. // THIS METHOD YOU HAVE TO MOCK
  10. .map(response -> anotherService.processResponse(response))
  11. )
  12. .collectList()
  13. .block();
  14. stopWatch.stop();
  15. log.info("time elapsed: " + stopWatch.getTime());
  16. return result;
  17. }
展开查看全部

相关问题