java 使用spring集成向RabbitMQ发送消息会在应用加载时停止处理项

3qpi33ja  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(151)

我正在使用带有backPressureBuffer支持的projectreactorSink来处理我的spring Boot 应用程序中传入的请求。在我的FooService.kt

private val requestProcessor = Sinks.many().multicast().onBackpressureBuffer<FooRequest>(100)

EmitFailureHandler,如下所示

val emitFailureHandler = EmitFailureHandler { _: SignalType?, emitResult: EmitResult ->
    (emitResult == EmitResult.FAIL_NON_SERIALIZED || emitResult == EmitResult.FAIL_OVERFLOW)
  }

我在类中定义了一个init方法,如下所示:

init {
    requestProcessor.asFlux()
      .flatMap { request ->
        try {
          logger.debug("handleByRabbitMq : {}", request)
          footGateway.handleByRabbitMq(request)
        } catch (e: Exception) {
          logger.error("unknown error when processing foo request", e)
          Mono.empty()
        }
      }
      .subscribe()
  }

  fun handleRequest(fooRequest: FooRequest): Mono<FooResponse> {
    val (_, id) = fooRequest
    val responseSink = Sinks.one<FooResponse>()
    responseListeners[id] = responseSink
    requestProcessor.emitNext(fooRequest, emitFailureHandler)
    return responseSink.asMono()
      .timeout(Duration.ofSeconds(10))
      .onErrorResume { Mono.just(FooResponse(REQUEST_SUBMITTED)) }
      .doOnTerminate { responseListeners.remove(id) }
  }

FooController。kt

@GetMapping("/foo/request")
  @Timed(value = "foo.request")
  fun fooRequest(fooRequest: FooRequest): Mono<String> {
    return performAction(fooRequest)
      .flatMap { fooResponse ->
        Mono.fromCallable {
          "FooResponse: $fooResponse"
        }
      }
      .doOnNext { logger.debug("response {}", it) }
  }

  fun performAction(fooRequest: FooRequest): Mono<FooResponse> {
    return Mono.just(fooRequest))
      .flatMap { request ->
        fooService.handleRequest(request)
      }
      .doOnNext { response ->
        //some operations here
        logger.debug(response)
      }.subscribeOn(Schedulers.boundedElastic())
  }

一切都很好。请求最初会正常发出和处理。

问题:当应用程序处于请求负载下时,或者当我使用k6等框架进行负载测试时,requestProcessor停止发出项目。换句话说,init块停止执行,尽管requestProcessor.emitNext(fooRequest, emitFailureHandler)被调用。

最初,应用程序抛出FAIL_NON_SERIALIZEDFAIL_OVERFLOW错误。为此,我添加了上面的emitFailureHandler。它帮助,但推迟了几个小时的问题。
1.尽管成功率为96%,但应用程序仍有响应。这还是好的。
1.第二次执行:成功率下降到88%。执行此操作后,应用程序抛出503 http reponse,不再接受请求。
3.logs.

新发现:RabbitMQ [ fooGateway.handleByRabbitMq(request) ]似乎存在一些问题。我一直在寻找它,但没有发现任何错误。通过删除init块中的rabbitMQ,应用程序变得有响应。但我需要知道原因。

5vf7fwbs

5vf7fwbs1#

我最初的假设是,问题肯定出在React堆项目上。但是,问题似乎出在spring integration.
我的申请流程如下:

FooController (1) ➝ FooService (2) ➝ Spring Integration (3) ➝ RabbitMQ (4)

正如问题中所解释的那样,这个流程有时工作正常。然而,在处理了几个请求后,我。例如,300个请求,应用挂起并且无法接受更多请求。e.返回503 http reponse
经过深入研究,似乎请求在步骤(3)停止,由于某些未知原因,该步骤将任务委托给spring integration。
解决方案:

FooController (1) ➝ FooService (2) ➝ RabbitMQ (3)

注意:将任务委托给Spring Integration是方便的,也是推荐的。因此,更理想的情况如下所示,如果使用spring integration (4)解决根本原因。

FooController (1) ➝ Spring Integration(2) ➝ FooService (3) ➝ Spring Integration(4) ➝ RabbitMQ (5)

相关问题