React器-处理错误情况下的延迟通量元件

sirbozc5  于 2021-06-29  发布在  Java
关注(0)|答案(1)|浏览(316)

我有一个类似的问题,这个问题,我没有看到一个公认的答案。我仔细研究了一下,没有得到满意的答案。
我有一个轮询量为“x”的被动kafka消费者(spring reactor),应用程序使用被动webclient将轮询的消息推送到被动端点。这里的问题是,外部服务可以执行不同的超时,我将不得不调整Kafka消费者轮询较少的消息时,断路器打开(或踢在背压),当我们看到很多失败。现在的React堆有没有办法自动
当断路器处于分闸状态时,应作出React,减少轮询量或降低消耗。
当电路关闭时,将轮询量增加到以前的状态(如果外部服务下降,它将按比例增加)。
我不想使用 delayElements 或者 delayUntil 因为它们本质上大多是静态的,并且希望应用程序在运行时做出React。如何配置这些端到端背压?我会提供消费者的价值观时,电路是关闭的,部分关闭和开放的应用程序配置。

rdlzhqv9

rdlzhqv91#

由于反压力是基于使用者的慢度,实现这一点的一种方法是将某些异常类型转换为延迟。你可以用 onErrorResume 为此,如下所示:

long start = System.currentTimeMillis();

Flux.range(1, 1000)
        .doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
        .flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
        .blockLast();

System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");

private Mono<Integer> process(Integer item) {
    // simulate error for some items
    if (item >= 50 && item <= 100) {
        return Mono.error(new RuntimeException("Downstream failed."));
    }

    // normal processing
    return Mono.delay(Duration.ofMillis(10))
            .thenReturn(item);
}

private Mono<Integer> slowDown(Throwable e) {
    if (e instanceof RuntimeException) { // you could check for circuit breaker exception
        return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
    }

    return Mono.empty(); // no delay for other errors
}

如果你检查这个代码的输出,你可以看到在第50项和第100项之间有一些减速,但是它在前后都以正常的速度工作。
请注意,我的示例没有使用Kafka。当你使用React堆Kafka图书馆荣誉背压,它应该工作的方式与这个虚拟的例子相同。
此外,由于流量可能同时处理项目,因此减速不是立即的,它将尝试在适当减速之前处理一些额外的项目。

相关问题