java 如何在可完成的未来流中展平列表?

bakd9h0s  于 2023-09-29  发布在  Java
关注(0)|答案(3)|浏览(80)

我有这个:

Stream<CompletableFuture<List<Item>>>

我怎么把它转换成

Stream<CompletableFuture<Item>>

其中:第二流由第一流中的每个列表内的每个和所有项目组成。
我研究了thenCompose,但它解决了一个完全不同的问题,也被称为“扁平化”。
如何以流的方式有效地完成这一点,而不会阻塞或过早地消耗比必要的更多的流项目?
以下是我迄今为止最好的尝试:

ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
    Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;

    @SuppressWarnings("unchecked")
    CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
    CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
    for(CompletableFuture<List<IncomingItem>> item: allFutures) {
        queue.submit(item::get);
    }
    List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
    CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
        queue.submit(() -> THE_END);
        return THE_END;
    });
    queue.submit(() -> ender.get());
    Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
        boolean checkNext = true;
        List<IncomingItem> next = null;
        @Override
        public boolean hasNext() {
            if(checkNext) {
                try {
                    next = queue.take().get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
                checkNext = false;
            }
            if(next == THE_END || next == null) {
                return false;
            }
            else {
                return true;
            }
        }
        @Override
        public List<IncomingItem> next() {
            if(checkNext) {
                hasNext();
            }
            if(!hasNext()) {
                throw new IllegalStateException();
            }
            checkNext = true;
            return next;
        }
    };
    Stream<IncomingItem> flat = StreamSupport.stream(iter.spliterator(), false).flatMap(List::stream);

这在第一次工作,不幸的是,它有一个致命的bug:结果流似乎在检索所有项目之前过早地终止。

2023年更新

4年过去了,我仍然没有选择一个答案,因为目前两个答案都在说这是“不可能的”。听起来不太对劲我只是要求一种有效的方式来采取分批完成的项目,并能够单独监控其完成情况。也许这在Java 21中的虚拟线程中更容易实现?我不再维护这个代码库,但这个问题仍然是一个未解决的大问题。
更新#2:澄清假设
CompletableFuture * 表示根据https://www.geeksforgeeks.org/completablefuture-in-java/…有趣的是,JavaDocs不太清楚。

y1aodyip

y1aodyip1#

正如我在评论中所说,这是不可能的。
考虑一个任意的服务,它将返回一个CompletableFuture<Integer>

CompletableFuture<Integer> getDiceRoll();

我现在可以毫无问题地将这个CompletableFuture<Integer>转换为Stream<CompletableFuture<List<Object>>>

Stream<CompletableFuture<List<Object>>> futureList = Stream.of(getDiceRoll().thenApply(n -> List.of(new Object[n])));

让我们假设有一种通用的方法可以将Stream<CompletableFuture<List<T>>>转换为Stream<CompletableFuture<T>>

<T> Stream<CompletableFuture<T> magic(Stream<CompletableFuture<List<T>>> arg);

然后我可以执行以下操作:

int diceRoll = magic(Stream.of(getDiceRoll().thenApply(n -> List.of(new Object[n])))).count();

等等,什么?
我现在可以从CompletableFuture中得到任意整数。这意味着,通过一些工程努力,我可以从CompletableFuture中获得所有信息-毕竟,内存只是一些数字。
因此,我们必须得出结论,像magic这样的方法不可能存在,而不会违反时间结构。
答案是:没有这样的方法,因为它不可能存在。

t8e9dugd

t8e9dugd2#

同意Johannes Kuhn的观点。当Futures仍在执行时,您无法知道它的状态,因此无法从Stream<CompletableFuture >转换为Stream。
虽然流的输出可以使用以下代码合并-java Stream<CompletableFuture<List<Item>>>java List<Item>
java List<CompletableFuture<List<AuditRecord>>>java List<Item>

List<Item> output = input.map(CompletableFuture::join).collect(toList()).stream()
    .flatMap(Collection::stream).collect(toList());
v8wbuo2f

v8wbuo2f3#

我有这个:
Stream<CompletableFuture<List<Item>>>
我怎么把它转换成
Stream<CompletableFuture<Item>>
我研究了thenCompose,但它解决了一个完全不同的问题,也被称为“扁平化”。
如何以流的方式有效地完成这一点,而不会阻塞或过早地消耗比必要的更多的流项目?
这是完全可能的,但长话短说,它不会为您节省任何工作,它会以不给您带来任何好处的方式增加开销,并且实际上会使您的代码在长期运行中变慢。你这样做将一事无成。
原因很简单--CompletableFuturepush-down design上运行。这意味着,您尝试附加链接调用(thenApplythenCompose等)或完整调用(getjoincomplete)的 INSTANTCompletableFuture将立即开始计算。当您将一个后续函数链接到CompletableFuture时,可能更容易将其视为急切评估。不是每个后续函数都会触发计算,但所有有用的函数都会触发计算。
这意味着,当你试图触摸(比喻)一个CompletableFuture时,你已经触发了它开始计算。而且这个计算是“必要的”,所以它仍然符合你的要求。
我很乐意详细解释为什么你想在CompletableFuture<Item>上调用的函数要么会和从CompletableFuture<List<Item>>上获取一样,要么会更昂贵,更慢,没有任何好处。请告诉我,然后我会修改这个答案,并解释为什么你想要的功能不会以任何有用的方式帮助你,除了你喜欢的形式,无论出于什么原因。
无论如何,如果这真的是你想要的,那么这就是方法。

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

public class SOQ_20230926
{

   public static void main(String[] args) throws Exception
   {
   
      record Item(int index) {}
   
      final Stream<CompletableFuture<List<Item>>> pointA =
         create
         (
            List
               .of
               (
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(4), new Item(5), new Item(6)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9))
               )
         )
         ;
   
      final Stream<CompletableFuture<Item>> pointZ = convert(pointA);
   
      final Item item =
         pointZ
            .map(CompletableFuture::join)
            .filter(eachItem -> eachItem.index() == 4)
            .findFirst()
            .orElseThrow()
            ;
   
      System.out.println(item);
   
   }

   private static <T> Stream<CompletableFuture<T>> convert(final Stream<CompletableFuture<List<T>>> pointA)
   {
   
      return
         pointA
            .parallel()
            .map
            (
               eachFuture ->
                  eachFuture
                     .thenApply
                     (
                        batch ->
                           batch
                              .stream()
                              .map(eachElement -> CompletableFuture.supplyAsync(() -> eachElement))
                              .toList()
                     )
            )
            .map(CompletableFuture::join)
            .flatMap(List::stream)
            ;
   
   }

   private static <T> Stream<CompletableFuture<List<T>>> create(final List<List<T>> lists)
   {
   
      return
         lists
            .stream()
            .map
            (
               eachList ->
                  CompletableFuture
                     .supplyAsync
                     (
                        () ->
                        {
                        
                           System.out.println("COMPUTATION WAS PERFORMED");
                        
                           return eachList;
                        
                        }
                     )
         
            )
            ;
   
   }

}

这是输出。

COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
Item[index=4]

正如您所看到的,Stream“提前停止”,只做了查找所需的Item所需的工作。
如果它做了比需要更多的工作,我们将看到COMPUTATION WAS PERFORMED的所有54行,但我们只看到31行,因为这是找到我们的值所需的全部内容。剩下的批次仍然没有打开,因为我们找到了我们需要的。
这将满足您的两个要求。这些是以下内容。
我有这个:
Stream<CompletableFuture<List<Item>>>
我怎么把它转换成
Stream<CompletableFuture<Item>>
如何以流的方式有效地完成这一点,而不会阻塞或过早地消耗比必要的更多的流项目?

相关问题