如何限制Spring webflux(netty webserver)中可以处理的请求数量

vc9ivgsu  于 2024-01-05  发布在  Spring
关注(0)|答案(1)|浏览(263)

我想要一些等价于在tomcat中设置此属性的东西。这将设置线程池中的线程数为100,因此可以并发处理的最大请求数为100个请求。

  1. server.tomcat.maxthread=100

字符串
但是在webflux(netty webserver)中,我找不到限制请求的配置属性。
这就是我迄今为止所尝试的
1 -为ReactiveWebServerFactory定义一个自定义bean。我创建了两个API端点来测试第一个端点是阻塞IO第二个不是,但它一次只能处理一个请求,另一个请求必须等到第一个完成即使它应该运行在不同的线程上。

  1. @Bean
  2. public ReactiveWebServerFactory reactiveWebServerFactory() {
  3. NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
  4. factory.addServerCustomizers(builder -> builder.runOn(new NioEventLoopGroup(2)));
  5. return factory;
  6. }


2 -设置这个属性,我发现在文档中,但它仍然可以处理2个以上的不同线程的请求。(我测试了与上述相同的方式)

  1. server.netty.max-keep-alive-requests=2


3 -设置tomcat maxthread为2,这一个没有我预期的效果。

  1. server.tomcat.maxthread=2


所以是否可以限制webflux中的最大请求数。
我的API用于测试

  1. @RestController
  2. public class CheckController {
  3. @PostMapping("/test")
  4. public Mono<Long> something() throws InterruptedException {
  5. System.out.println("cores " + Runtime.getRuntime().availableProcessors());
  6. return Mono.just(2L)
  7. .flatMap(d -> simulateBlockingOperation())
  8. .log();
  9. }
  10. @PostMapping("/test2")
  11. public Mono<Long> something2() throws InterruptedException {
  12. return Mono.just(1L)
  13. .log();
  14. }
  15. private Mono<Long> simulateBlockingOperation() {
  16. System.out.println("Current thread " + Thread.currentThread().getName());
  17. int x = 0;
  18. while(x!=1) {}
  19. return Mono.just(2L);
  20. }
  21. }


最新消息:我临时解决了这个问题,创建另一个线程池,并使用.subscribeOn来切换线程组,并限制线程组上的线程数量。但我觉得这是一种非常肮脏的方式。

  1. @RestController
  2. public class CheckController {
  3. private final Scheduler customThreadPool;
  4. public CheckController(Scheduler customThreadPool) {
  5. this.customThreadPool = customThreadPool;
  6. }
  7. @Bean
  8. public Scheduler reactiveRequestThreadPool() {
  9. return Schedulers.newBoundedElastic(2, 2, "my-custom-thread");
  10. }
  11. @PostMapping("/test")
  12. public Mono<Long> something() throws InterruptedException {
  13. return Mono.just(2L)
  14. .doOnNext(d -> getCurrentThread())
  15. .flatMap(d -> simulateBlockingOperation())
  16. .subscribeOn(customThreadPool)
  17. .log();
  18. }
  19. @PostMapping("/test2")
  20. public Mono<Long> something2() throws InterruptedException {
  21. return Mono.just(1L)
  22. .doOnNext(d -> getCurrentThread())
  23. .subscribeOn(customThreadPool)
  24. .log();
  25. }
  26. private Mono<Long> simulateBlockingOperation() {
  27. int x = 0;
  28. while(x!=1) {}
  29. return Mono.just(2L);
  30. }
  31. private void getCurrentThread() {
  32. System.out.println("current thread " + Thread.currentThread().getName());
  33. }
  34. }

ngynwnxp

ngynwnxp1#

这里有一个想法给你。创建一个过滤器,将覆盖所有的API由您的服务器。在该过滤器有一个静态原子计数器的当前请求。每次请求来检查计数器,如果它不大于你的限制,允许请求和递增计数器。如果它大于你的限制拒绝请求。每次请求完成,响应回来通过你的过滤器-递减计数器。

相关问题