热释光;博士;
有没有一种方法可以根据下游的健康状况自动调整项目React堆中元件之间的延迟?
更多细节
我有一个应用程序,它从kafka主题读取记录,为每个记录发送一个http请求,并将结果写入另一个kafka主题。从/到kafka的读写既快速又简单,但是第三方http服务很容易被淹没,所以我使用 delayElements()
属性文件中的值,这意味着该值在应用程序运行时不会更改。下面是一个代码示例:
kafkaReceiver.receiveAutoAck()
.concatMap(identity())
.delayElements(ofMillis(delayElement))
.flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
.onErrorContinue(handleError())
.map(this::getSenderRecord)
.flatMap(kafkaSender::send)
但是,第三方服务可能会执行不同的加班,我想能够相应地调整这个延迟。比方说,如果我看到超过5%的请求在10秒内失败,我会增加延迟。如果它在10秒内低于5%,那么我会再次减少延迟。
在React堆中是否存在这样的机制?我可以从我的Angular 想到一些创造性的解决方案,但我想知道他们(或其他人)是否已经实现了这一点。
3条答案
按热度按时间sd2nnvve1#
我不认为有任何http客户端提供的反压力,包括netty。一种选择是切换到rsocket,但如果您正在呼叫第三方服务,我想这可能不是一种选择。您可以调整一个在一天中大部分时间都有效的速率,并使用doonerror或类似工具将出错的消息发送到另一个主题。另一个接收者可以以更高的延迟处理这些消息,如果消息再次出错,则以重试次数将消息放回同一主题,这样您就可以最终停止处理它们。
snvhrwxg2#
您可以添加带有指数退避的重试。像这样的东西:
这将重试失败的请求integet.max\u value次,每次重试之间的最短时间为30秒。随后的重试实际上被一个可配置的抖动因子(默认值=0.5)抵消,从而导致连续重试之间的持续时间增加。
关于
Retry.backoff
他说:给定最大重试次数和最小退避持续时间,为带有抖动的指数退避策略预先配置的retrybackoffspec。
另外,由于整个操作都Map到
flatMap
,可以更改默认值concurrency
以及prefetch
值,以说明在整个管道等待retrybackoffspec成功完成时,在任何给定时间可能失败的最大请求数。最坏的情况,你的
concurrency_val
失败并等待30秒以上重试的请求数。如果下游系统不能及时恢复,整个操作可能会停止(仍在等待下游的成功),这可能是不理想的。最好将退避限制从Integer.MAX_VALUE
它将只记录错误并继续处理下一个事件。sg24os4d3#
如果要查找延迟元素取决于元素的处理速度,则可以使用delayuntil。