java Quarkus Mutiny Uni/Multi等待请求响应完成

brjng4g3  于 2023-06-28  发布在  Java
关注(0)|答案(3)|浏览(238)

我找不到解决这个问题的正确方法,我被卡住了。假设我有这个方法

@GET
    @Path("/testingAsync")
    public Uni<List<String>> testingMutiny() {
        List<String> completeList = new ArrayList<>();
        completeList.add("hello");
        completeList.add("RestEasy");

        List<String> finalList = new ArrayList<>();

        completeList.forEach(e -> Uni.createFrom().item(e)
                .onItem().delayIt().by(Duration.ofMillis(10000))
                .map(value -> finalList.add(value.toUpperCase()))
                .subscribe().asCompletionStage());

        return Uni.createFrom().item(finalList);
    }

正如你所看到的,这个方法很简单,它只是从第一个列表中获取值,并将它们添加到第二个列表中,但问题是什么呢?当你添加等待的.onItem().delayIt().by(Duration.ofMillis(10000))时,程序将返回一个空列表,过了一段时间,它只会更新列表。我创建了这个方法来模拟一个请求,其中的响应有一些延迟。
假设你用2个不同的Uni点击了2个URL,然后你尝试将它们组合起来,并将其作为一个Uni返回。问题是如果其中一个2网址延迟由于某种原因,我们将返回列表空,但我不希望发生这种情况,我要么希望列表完成100%或返回一个错误,如果它需要一段时间。
最好的方法是什么?我知道如果你添加await()你会阻塞主线程,你会失去使用响应式库的所有价值,但我仍然找不到一种方法来实现它

编辑

我发现我尝试调用的外部URL需要大约5秒的时间来完成这项工作,所以我希望我的代码在创建Uni时停止,并在收到服务器的答复后继续。我在他们的文档(here)中看到,我也可以无限期地调用await.,但我会收到The current thread cannot be blocked: vert.x-eventloop-thread-14。如何等待服务器的响应?

编辑2

我知道对于字符串来说,我的问题是没有意义的,所以假设我有下面这个

@GET
    @Path("/testingAsync")
    public Uni<List<Car>> testingMutiny() {

        //ALL THIS IS IN A FOR EACH FOR EVERY CAR

            //HIT ENDPOINT GET DOORS
            Uni<List<JsonObjectCar>> carDoorsUni = getDoors(variable1,
                    variable2, variable3);

            //HIT ENDPOINT GET WHEELS
            Uni<List<JsonObjectCar>> carWheelsUni = getWheels(variable1,
                    variable2, variable3);

            //HIT ENDPOINT GET WINDOWS
            Uni<List<JsonObjectCar>> carWindowsUni = getWindows(variable1,
                    variable2, variable3);

            Uni.combine()
                    .all()
                    .unis(carDoorsUni, carWheelsUni, carWindowsUni)
                    .combinedWith((carDoors, carWheels, carWindows) -> {
                        //Check if cardoors is present and set the doors into the car object
                        Optional.of(carDoors)
                                .ifPresent(val -> car.setDoors(val.getDoors()));
                        Optional.of(carWheels)
                                .ifPresent(val -> car.setWheels(val.getWheels()));
                        Optional.of(carWindows)
                                .ifPresent(val -> car.setWindows(val.getWindows()));

                        return car;
                    }).subscribe().with(e-> System.out.println("Okay it worked"));
            
          //END OF FOR EACH 
            
            
            //Return car (Should have been returned with new doors / wheels/ windows but instead its empty)
        return Uni.createFrom().item(car);

    }

正如你在上面的代码中看到的,它应该已经命中了一些doors / wheels / windows的端点,并将它们设置到变量car中,但实际上发生的是,car是空的,因为其中一个端点已经被延迟,所以我返回了一个没有这些值的car。我想先更新car对象,然后实际返回它

sg2wtvxw

sg2wtvxw1#

你可以像这样重写这个方法:

@GET
    @Path("/testingAsync")
    public Uni<List<String>> testingMutiny() {
        List<Uni<String>> unis = new ArrayList<>();
        List.of("hello", "RestEasy").forEach( e -> {
            unis.add( Uni.createFrom().item( e )
                .onItem().delayIt().by( Duration.ofMillis( 10000 ) ) );
        } );

        return Uni.combine().all().unis( unis )
                .combinedWith( list -> (List<String>) list);
    }

请注意,在编写响应式代码时,您希望避免使用.await().indefinetly。在使用Quarkus时,无论如何都不应该需要它,因为它可以识别异步API并相应地解释结果。
出于同样的原因,在使用Quarkus时,您也不需要订阅Uni或Multi。
基于我之前的示例,您可以将端点的用例重写为:

@GET
    @Path("/testingAsync")
    public Uni<Car> testingMutiny() {
            Uni<List<JsonObjectCar>> carDoorsUni = getDoors(variable1, variable2, variable3);
            Uni<List<JsonObjectCar>> carWheelsUni = getWheels(variable1,variable2, variable3);
            Uni<List<JsonObjectCar>> carWindowsUni = getWindows(variable1,variable2, variable3);

            return Uni.combine()
                    .all()
                    .unis(carDoorsUni, carWheelsUni, carWindowsUni)
                    .combinedWith(list -> {
                        // Result of carDoorsUni
                        List<JsonObjectCar> carDoors = list.get(0);

                        // Result of carWheelsUni
                        List<JsonObjectCar> carWheels = list.get(1);

                        // Result of carWindowsUni
                        List<JsonObjectCar> carWindows = list.get(2);
                        
                        // Create a car instance with the previous results
                        Car car = createCar(...);

                        // You can also return a list of cars, but you need to change the return type of testingMutiny to Uni<List<Car>>
                        return car;
                    })
                    .invoke( () -> System.out.println("Okay it worked"));
    }
w8f9ii69

w8f9ii692#

返回一个list,但是Uni上的异步处理被延迟,所以list将为空。
你应该尝试从你创建的管道中返回一个Uni(也可以参考collect()toUni()放入列表中),而不是进行订阅,收集结果并重新 Package 成一个Uni

hmtdttj4

hmtdttj43#

使用MultiStream,我们可以得出如下结果:

@GET
    @Path("/testingAsync")
    public Multi<String> testingMutiny() {
        List<String> completeList = new ArrayList<>();
        completeList.add("hello");
        completeList.add("RestEasy");

        return Multi.createFrom().items(completeList.stream()).onItem()
                .transformToUni(value -> Uni.createFrom().item(value.toUpperCase()))
                .concatenate();
    }

了解Merging and Concatenating Streams之间的差异也很好

相关问题