React式编程——如何在java的异步方法中调用synchronus阻塞方法?

lo8azlld  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(395)

我在用React堆项目库。这是我的设想。
我想在我的非阻塞方法中调用阻塞服务。
我有三个不同的服务,我从springboot应用程序中调用了这三个服务。这是我的示例代码

public Mono<Example> getValuesFromDifferentServices() {

  Mono<Example1> mono1=service.getService1();
  Mono<Example2> mono2=service.getService2();

  mono1.zipwith(mono2)
     .map(value-> {
         // some logics then
         if(value.getT1().getStatus().equals(value.getT2().getStatus())) {
           Mono<Example3> mono3 = service.getService3(true);
           mono3.map(f-> {
              value.getT1().setSomething(f.getSomething);
              return f;
           }).subscribe();
         }
       return value.getT1();
     })
}

注:以上示例不是实际逻辑。但其实施与此类似
我也试过了 subscribe() 因此,我一直无法获得第三个服务值(不确定值)。我不能 block() 第三次服务,因为它是不允许的。如何做到这一点?
更新:第三个服务输入将在条件是否为真后决定 Mono<Example3> mono3 = service.getService3(true); 如果只有条件匹配,我们应该调用第三个服务,否则调用第三个服务是不必要的,这是不可取的。如果条件不匹配,我们不应该调用第三个服务。

5f0d552i

5f0d552i1#

这个例子有点奇怪,但据我所知,您希望调用前两个服务,每个服务返回一个值。之后,如果需要,您希望调用第三个字段,并将该字段中的值设置为第一个字段中的一个字段。
无论如何,有一个简单的解决方案,但是有了更多的信息,也许我们可以创建更好的流。此流采用flatmap的adventages,后者急切地订阅内部发布服务器。
[这个例子是用kotlin编写的,非常像java。唯一让人困惑的是 it 变量,等于: map(it -> it.sg ) ]

data class Example(
    val name: String,
    val status: String,
    var value: String? = null  
)

class ReactorTest {

    @Test
    fun test() {
        val first = Mono.just(Example("first", "suspended"))
        val second = Mono.just(Example("second", "suspended"))
        val third = Mono.just(Example("third", "suspended", "thirdValue"))

        val stream = first.zipWith(second)
            .flatMap { tuple ->
                Mono.just(tuple.t1)
                    .filter { it.status == tuple.t2.status }
                    .zipWith(third)
                    .doOnNext {
                        it.t1.value = it.t2.value
                    }
                    .map { it.t1 }
                    .switchIfEmpty(Mono.just(tuple.t1))
            }

            StepVerifier.create(stream)
                .expectNext(Example("first", "suspended", "thirdValue"))
                .verifyComplete()
    }

    @Test
    fun test2() {
        val first = Mono.just(Example("first", "suspended"))
        val second = Mono.just(Example("second", "active"))
        val third = Mono.just(Example("third", "suspended", "thirdValue"))

        val stream = first.zipWith(second)
            .flatMap { tuple ->
                Mono.just(tuple.t1)
                    .filter { it.status == tuple.t2.status }
                    .zipWith(third)
                    .doOnNext {
                        it.t1.value = it.t2.value
                    }
                    .map { it.t1 }
                    .switchIfEmpty(Mono.just(tuple.t1))
            }

        StepVerifier.create(stream)
            .expectNext(Example("first", "suspended"))
            .verifyComplete()
    }
}

旁注:如果您在React流中使用阻塞服务,那么应该将这些服务分离到专用的线程池中。比如:

fun blockingService(): Mono<String> {
    //real service use fromCallable
    return Mono.just("fromCallableOnServiceCall")
        //for real service it may use a dedicated pool
        .subscribeOn(Schedulers.boundedElastic())

}

相关问题