java&React器中的自动速率调节

qyswt5oh  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(441)

热释光;博士;

有没有一种方法可以根据下游的健康状况自动调整项目React堆中元件之间的延迟?

更多细节

我有一个应用程序,它从kafka主题读取记录,为每个记录发送一个http请求,并将结果写入另一个kafka主题。从/到kafka的读写既快速又简单,但是第三方http服务很容易被淹没,所以我使用 delayElements() 属性文件中的值,这意味着该值在应用程序运行时不会更改。下面是一个代码示例:

kafkaReceiver.receiveAutoAck()
            .concatMap(identity())
            .delayElements(ofMillis(delayElement))
            .flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
            .onErrorContinue(handleError())
            .map(this::getSenderRecord)
            .flatMap(kafkaSender::send)

但是,第三方服务可能会执行不同的加班,我想能够相应地调整这个延迟。比方说,如果我看到超过5%的请求在10秒内失败,我会增加延迟。如果它在10秒内低于5%,那么我会再次减少延迟。
在React堆中是否存在这样的机制?我可以从我的Angular 想到一些创造性的解决方案,但我想知道他们(或其他人)是否已经实现了这一点。

sd2nnvve

sd2nnvve1#

我不认为有任何http客户端提供的反压力,包括netty。一种选择是切换到rsocket,但如果您正在呼叫第三方服务,我想这可能不是一种选择。您可以调整一个在一天中大部分时间都有效的速率,并使用doonerror或类似工具将出错的消息发送到另一个主题。另一个接收者可以以更高的延迟处理这些消息,如果消息再次出错,则以重试次数将消息放回同一主题,这样您就可以最终停止处理它们。

snvhrwxg

snvhrwxg2#

您可以添加带有指数退避的重试。像这样的东西:

influx()
.flatMap(x -> Mono.just(x)
    .map(data -> apiCall(data))
    .retryWhen(
            Retry.backoff(Integet.MAX_VALUE, Duration.ofSeconds(30))
                .filter(err -> err instanceof RuntimeException)
                .doBeforeRetry(
                    s -> log.warn("Retrying for err {}", s.failure().getMessage()))
                .onRetryExhaustedThrow((spec, sig) -> new RuntimeException("ex")))
                .onErrorResume(err -> Mono.empty()),
        concurrency_val,
        prefetch_val)

这将重试失败的请求integet.max\u value次,每次重试之间的最短时间为30秒。随后的重试实际上被一个可配置的抖动因子(默认值=0.5)抵消,从而导致连续重试之间的持续时间增加。
关于 Retry.backoff 他说:
给定最大重试次数和最小退避持续时间,为带有抖动的指数退避策略预先配置的retrybackoffspec。
另外,由于整个操作都Map到 flatMap ,可以更改默认值 concurrency 以及 prefetch 值,以说明在整个管道等待retrybackoffspec成功完成时,在任何给定时间可能失败的最大请求数。
最坏的情况,你的 concurrency_val 失败并等待30秒以上重试的请求数。如果下游系统不能及时恢复,整个操作可能会停止(仍在等待下游的成功),这可能是不理想的。最好将退避限制从 Integer.MAX_VALUE 它将只记录错误并继续处理下一个事件。

sg24os4d

sg24os4d3#

如果要查找延迟元素取决于元素的处理速度,则可以使用delayuntil。

Flux.range(1, 100)
      .doOnNext(i -> System.out.println("Kafka Receive :: " + i))
      .delayUntil(i -> Mono.fromSupplier(() -> i)
                            .map(k -> {
                                // msg processing
                                return k * 2;
                            })
                            .delayElement(Duration.ofSeconds(1)) // msg processing simulation
                            .doOnNext(k -> System.out.println("Kafka send :: " + k)))
      .subscribe();

相关问题