rxjava retrywhen(指数后退)不工作

sulc1iza  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(422)

所以我知道这个问题以前被问过很多次,但我试过很多次,似乎都没有效果。
让我们从以下博客/文章/代码开始:
https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
https://jimbaca.com/rxjava-retrywhen/
http://blog.inching.org/rxjava/2016-12-12-rx-java-error-handling.html
https://pamartinezandres.com/rxjava-2-exponential-backoff-retry-only-when-internet-is-available-5a46188ab175
https://gist.github.com/wotomas/35006d156a16345349a2e4c8e159e122
还有很多其他的。
简而言之,它们都描述了如何使用retrywhen实现指数退避。像这样:

source
.retryWhen(
  errors -> {
    return errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(
       retryCount -> {
       System.out.println("retry count " + retryCount);
       return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
     });
 })

甚至图书馆里的文件也同意这一点:https://github.com/reactivex/rxjava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/observable.java#l11919.
然而,我尝试过这个和一些非常类似的变体,不值得在这里描述,似乎没有什么工作。有一种方法是,这些示例可以使用阻塞订户,但我希望避免阻塞线程。
因此,如果我们对前面的可观察对象应用如下的阻塞订户:

.blockingForEach(System.out::println);

它按预期工作。但这不是我的想法。如果我们尝试:

.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

流程只运行一次,因此不是我想要实现的。
这是不是意味着它不能像我想的那样被使用?从文件上看,要达到我的要求似乎不是什么问题。
你知道我错过了什么吗?
蒂亚。
编辑:我有两种测试方法:
试验方法(使用testng):

Observable<Integer> source =
    Observable.just("test")
        .map(
            x -> {
              System.out.println("trying again");
              return Integer.parseInt(x);
            });
source
    .retryWhen(
        errors -> {
          return errors
              .zipWith(Observable.range(1, 3), (n, i) -> i)
              .flatMap(
                  retryCount -> {
                    return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
                  });
        })
    .subscribe(...);

来自kafka消费者(使用spring boot):
这只是对observer的订阅,但是重试逻辑是我在本文前面描述的。

@KafkaListener(topics = "${kafka.config.topic}")
      public void receive(String payload) {
        log.info("received payload='{}'", payload);
        service
            .updateMessage(payload)
            .subscribe(...)
            .dispose();
      }
u4vypkhs

u4vypkhs1#

代码的主要问题是observable.timer在默认情况下在计算调度器上运行。当试图验证测试中的行为时,这会增加额外的工作量。
下面是一些单元测试代码,用于验证重试代码是否确实在重试。
它增加了一个计数器,这样我们就可以很容易地检查发生了多少电话。
它使用testscheduler而不是计算调度器,这样我们就可以假装在advancetimeby中移动。

TestScheduler testScheduler = new TestScheduler();
AtomicInteger counter = new AtomicInteger();

Observable<Integer> source =
    Observable.just("test")
        .map(
            x -> {
                System.out.println("trying again");
                counter.getAndIncrement();
                return Integer.parseInt(x);
            });
TestObserver<Integer> testObserver = source
    .retryWhen(
        errors -> {
            return errors
                .zipWith(Observable.range(1, 3), (n, i) -> i)
                .flatMap(
                    retryCount -> {
                        return Observable.timer((long) Math.pow(1, retryCount), SECONDS, testScheduler);
                    });
        })
    .test();

assertEquals(1, counter.get());

testScheduler.advanceTimeBy(1, SECONDS);
assertEquals(2, counter.get());

testScheduler.advanceTimeBy(1, SECONDS);
assertEquals(3, counter.get());

testScheduler.advanceTimeBy(1, SECONDS);
assertEquals(4, counter.get());

testObserver.assertComplete();

相关问题