在Java中有限生成流-如何创建一个?

2izufjch  于 2022-12-28  发布在  Java
关注(0)|答案(4)|浏览(110)

在Java中,使用Stream.generate(supplier)可以很容易地生成一个无限流,但是,我需要生成一个最终会完成的流。
想象一下,例如,我想要一个目录中所有文件的流,文件的数量可能很大,因此我无法预先收集所有数据并从中创建流(通过collection.stream())。我需要一段一段地生成序列。但是流显然会在某个时候结束,终端操作员喜欢(collect()findAny())需要在上面工作,所以Stream.generate(supplier)在这里不合适。
在Java中,有没有什么合理的简单方法可以做到这一点,而不必自己实现整个Stream接口?
我可以想到一个简单的方法--用无限的Stream.generate(supplier)来做,当所有的实际值都被取走时,提供null或者抛出一个异常。但是这会破坏标准的流操作符,我只能用我自己的操作符来使用它,这些操作符知道这种行为。

    • 澄清**

评论中的人建议我使用takeWhile()操作符。这不是我的意思。如何更好地表达这个问题...我不是在问如何过滤(或限制)现有的流,我是在问如何动态地创建(生成)流,而不是预先加载所有元素,但流将具有有限的大小(事先未知)。

    • 解决方案**

我要找的密码是

Iterator it = myCustomIteratorThatGeneratesTheSequence();
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), false);

我刚刚研究了java.nio.file.Fileslist(path)方法是如何实现的。

kmpatx3s

kmpatx3s1#

虽然作者已经放弃了takeWhile选项,但我发现它对于某些用例来说已经足够了,值得解释一下。
方法takeWhile可用于任何流,并将在提供给该方法的 predicate 返回false时终止该流。导致false的对象不会追加到该流;只有产生true的对象才被向下传递。
因此,生成有限流的一种方法是使用Stream.generate方法,并返回一个值,该值通过提供给takeWhile的 predicate 计算为false来表示流的结束。
下面是一个例子,生成数组的所有排列:

public static Stream<int[]> permutations(int[] original) {
    int dim = original.length;

    var permutation = original.clone();
    int[] controller = new int[dim];
    var low = new AtomicInteger(0);
    var up = new AtomicInteger(1);

    var permutationsStream = Stream.generate(() -> {
        while (up.get() < dim) {
            if (controller[up.get()] < up.get()) {
                low.set(up.get() % 2 * controller[up.get()]);

                var tmp = permutation[low.get()];
                permutation[low.get()] = permutation[up.get()];
                permutation[up.get()] = tmp;

                controller[up.get()]++;
                up.set(1);

                return permutation.clone();
            } else {
                controller[up.get()] = 0;
                up.incrementAndGet();
            }
        }

        return null;
    }).takeWhile(Objects::nonNull);

    return Stream.concat(
            Stream.ofNullable(original.clone()),
            permutationsStream
    );
}

在这个例子中,我使用了null值来表示流的结束,方法的调用者不会收到空值!
OP可以使用类似的策略,并将其与访问者模式相结合。
如果它是一个平面目录,OP最好使用Stream.iterate,其中seed是要生成的文件的索引,Stream.limit是文件的数量(不需要浏览目录就可以知道)。

svmlkihl

svmlkihl2#

在Java中,有没有什么合理的简单方法可以做到这一点,而不必自己实现整个Stream接口?
一个简单的.limit()就可以保证它会终止,但这并不总是足够强大。
Stream工厂方法之后,创建定制流源而不重新实现流处理管道的最简单方法是将java.util.Spliterators.AbstractSpliterator<T>子类化并将其传递给java.util.stream.StreamSupport.stream(Supplier<? extends Spliterator<T>>, int, boolean)
如果你打算使用并行流,注意AbstractSpliterator只能产生次优的拆分,如果你对你的源代码有更多的控制,完全实现Spliterator接口会更好。
例如,下面的代码片段将创建一个Stream,提供一个无限序列1,2,3 ...
在特定示例中,您可以使用IntStream.range()
但是流显然会在某个点结束,像(collect()或findAny())这样的终端操作符需要对其进行处理。
findAny()这样的短路操作实际上可以在无限流上完成,只要存在任何匹配的元素。
Java 9引入了Stream. iterate来为一些简单的情况生成有限流。

8ehkhllq

8ehkhllq3#

用于从InputStream创建JsonNode流的Kotlin代码

private fun InputStream.toJsonNodeStream(): Stream<JsonNode> {
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(this.toJsonNodeIterator(), Spliterator.ORDERED),
                false
        )
    }

    private fun InputStream.toJsonNodeIterator(): Iterator<JsonNode> {
        val jsonParser = objectMapper.factory.createParser(this)

        return object: Iterator<JsonNode> {

            override fun hasNext(): Boolean {
                var token = jsonParser.nextToken()
                while (token != null) {
                    if (token == JsonToken.START_OBJECT) {
                        return true
                    }
                    token = jsonParser.nextToken()
                }
                return false
            }

            override fun next(): JsonNode {
                return jsonParser.readValueAsTree()
            }
        }
    }
bjg7j2ky

bjg7j2ky4#

下面是一个定制的、有限的流:

package org.tom.stream;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class GoldenStreams {
private static final String IDENTITY = "";

public static void main(String[] args) {
    Stream<String> stream = java.util.stream.StreamSupport.stream(new Spliterator<String>() {
        private static final int LIMIT = 25;
        private int integer = Integer.MAX_VALUE;
        {
            integer = 0;
        }
        @Override
        public int characteristics() {
            return Spliterator.DISTINCT;
        }
        @Override
        public long estimateSize() {
            return LIMIT-integer;
        }
        @Override
        public boolean tryAdvance(Consumer<? super String> arg0) {
            arg0.accept(IDENTITY+integer++);
            return integer < 25;
        }
        @Override
        public Spliterator<String> trySplit() {
            System.out.println("trySplit");
            return null;
        }}, false);
    List<String> peeks = new ArrayList<String>();
    List<String> reds = new ArrayList<String>();
    stream.peek(data->{
        peeks.add(data);
    }).filter(data-> {
        return Integer.parseInt(data)%2>0;
    }).peek(data ->{
        System.out.println("peekDeux:"+data);
    }).reduce(IDENTITY,(accumulation,input)->{
        reds.add(input);
        String concat = accumulation + ( accumulation.isEmpty() ? IDENTITY : ":") + input;
        System.out.println("reduce:"+concat);
        return concat;
    });
    System.out.println("Peeks:"+peeks.toString());
    System.out.println("Reduction:"+reds.toString());
}
}

相关问题