我正在使用带有backPressureBuffer支持的projectreactorSink来处理我的spring Boot 应用程序中传入的请求。在我的FooService.kt
private val requestProcessor = Sinks.many().multicast().onBackpressureBuffer<FooRequest>(100)
EmitFailureHandler,如下所示
val emitFailureHandler = EmitFailureHandler { _: SignalType?, emitResult: EmitResult ->
(emitResult == EmitResult.FAIL_NON_SERIALIZED || emitResult == EmitResult.FAIL_OVERFLOW)
}
我在类中定义了一个init方法,如下所示:
init {
requestProcessor.asFlux()
.flatMap { request ->
try {
logger.debug("handleByRabbitMq : {}", request)
footGateway.handleByRabbitMq(request)
} catch (e: Exception) {
logger.error("unknown error when processing foo request", e)
Mono.empty()
}
}
.subscribe()
}
fun handleRequest(fooRequest: FooRequest): Mono<FooResponse> {
val (_, id) = fooRequest
val responseSink = Sinks.one<FooResponse>()
responseListeners[id] = responseSink
requestProcessor.emitNext(fooRequest, emitFailureHandler)
return responseSink.asMono()
.timeout(Duration.ofSeconds(10))
.onErrorResume { Mono.just(FooResponse(REQUEST_SUBMITTED)) }
.doOnTerminate { responseListeners.remove(id) }
}
FooController。kt
@GetMapping("/foo/request")
@Timed(value = "foo.request")
fun fooRequest(fooRequest: FooRequest): Mono<String> {
return performAction(fooRequest)
.flatMap { fooResponse ->
Mono.fromCallable {
"FooResponse: $fooResponse"
}
}
.doOnNext { logger.debug("response {}", it) }
}
fun performAction(fooRequest: FooRequest): Mono<FooResponse> {
return Mono.just(fooRequest))
.flatMap { request ->
fooService.handleRequest(request)
}
.doOnNext { response ->
//some operations here
logger.debug(response)
}.subscribeOn(Schedulers.boundedElastic())
}
一切都很好。请求最初会正常发出和处理。
问题:当应用程序处于请求负载下时,或者当我使用k6
等框架进行负载测试时,requestProcessor
停止发出项目。换句话说,init
块停止执行,尽管requestProcessor.emitNext(fooRequest, emitFailureHandler)
被调用。
最初,应用程序抛出FAIL_NON_SERIALIZED
和FAIL_OVERFLOW
错误。为此,我添加了上面的emitFailureHandler
。它帮助,但推迟了几个小时的问题。
1.尽管成功率为96%,但应用程序仍有响应。这还是好的。
1.第二次执行:成功率下降到88%。执行此操作后,应用程序抛出503 http reponse
,不再接受请求。
3.logs.
新发现:RabbitMQ [ fooGateway.handleByRabbitMq(request)
]似乎存在一些问题。我一直在寻找它,但没有发现任何错误。通过删除init块中的rabbitMQ,应用程序变得有响应。但我需要知道原因。
1条答案
按热度按时间5vf7fwbs1#
我最初的假设是,问题肯定出在React堆项目上。但是,问题似乎出在
spring integration.
上我的申请流程如下:
正如问题中所解释的那样,这个流程有时工作正常。然而,在处理了几个请求后,我。例如,300个请求,应用挂起并且无法接受更多请求。e.返回
503 http reponse
。经过深入研究,似乎请求在步骤
(3)
停止,由于某些未知原因,该步骤将任务委托给spring integration。解决方案:
注意:将任务委托给Spring Integration是方便的,也是推荐的。因此,更理想的情况如下所示,如果使用
spring integration (4)
解决根本原因。