用并行流生成列表

iklwldmw  于 2021-06-26  发布在  Java
关注(0)|答案(1)|浏览(319)

我有一个json字符串列表,其中包含电影列表。我需要收集这些电影,处理它们并将它们存储在磁盘中。我正在考虑使用并行流方法来收集电影并测试其性能。我的方法是:
下面的方法生成电影列表。

protected abstract List<T> parseJsonString(JsonIterator iter);

此方法包含一个并行流,它收集所有列表的列表( List<List<Movies) )在溪流中产生:

public CompletableFuture<List<List<T>>> parseJsonPages(List<CompletableFuture<String>> jsonPageList)
{
    return jsonPageList.parallelStream()
            .map( jsonPageStr -> CompletableFuture.supplyAsync( () -> {
                try {
                    return parseJsonString(JsonIterator.parse( jsonPageStr.get() ) );
                }
                catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
                return null;
            } ) )
            .collect( ParallelCollectors.toFuture( Collectors.toList() ) );
}

这种方法的问题是流将生成电影列表,然后将所有列表附加到一个列表中。你认为这是收集所有这些电影的有效方法吗?我是否应该将所有列表中的电影合并到一个列表中,而不是将整个列表附加到一个列表中(尽管这也需要一些时间)。如果是,我该如何执行这样的任务?
提前谢谢。

tgabmvqs

tgabmvqs1#

织布机项目

将来,当projectloom带着它的虚拟线程到达时,只需将每个任务分配给一个虚拟线程,执行起来就会简单得多,而且可能会快得多。
projectloom的初步版本现在可以使用了,它是基于早期的accessjava16构建的。虽然可能会发生更改,而且还没有准备好进行生产,但是如果这是一个非任务关键型的个人项目,您可以考虑现在就使用它。
顺便说一下,你的 Movie 类可能适合定义为记录,这是Java16的特性之一。

List< String > inputListsOfMoviesAsJson = … ;  // Input.
Set< Movie > movies = Set.of() ;  // Output. Default to unmodifiable empty `Set`. 
try 
(
    ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
    movies = Collections.synchronizedSet( new HashSet< Movie > ) ;
    for( String inputJson : inputListsOfMoviesAsJson )
    {
        Runnable task = () -> movies.addAll( this.parseJsonIntoSetOfMovies( inputJson ) ) ;
        executorService.submit( task ) ;
    }
}
// At this point, flow-of-control blocks until all tasks are done.
// Then the executor service is automatically shutdown as part of being closed, as an `AutoCloseable` in a try-with-resources.
… use your `Set` of `Movie` objects.

如果要跟踪成功/失败,请捕获并收集 Future 每次调用返回的对象 executorService.submit( task ) . 为了演示的简单性,上面的代码忽略了这个返回值。
至于你关于积累一份清单的问题,结果如何 Movie 对象与以后合并相比,我不认为收集这些对象会成为瓶颈。我猜处理json将成为瓶颈。不管是哪种方式,使用projectloom时,使用profiler工具验证实际的瓶颈可能会更容易,因为编码更简单。
在上面的代码中,我使用 Set 通过调用 Collections.synchronized… . 您可以尝试 Set 或者 List . 列表可能更快,但如果数据输入中存在重复问题,则集合有消除重复的好处。

注意事项

记忆

这种方法假设您有足够的内存来处理所有json工作。对于虚拟线程,所有这些输入可能几乎同时得到处理。
在projectloom中,一个被阻塞的虚拟线程被“停驻”,移动到一边让另一个线程运行。因此,可以运行许多虚拟线程,甚至数百万个。
对于传统的平台/内核线程,一个阻塞的线程不会让另一个线程开始工作。所以一次运行的线程很少。
因此,如果内存是受限制的资源,则需要采取进一步的措施来防止太多虚拟线程启动json处理。

cpu限制的任务

虚拟线程(光纤)适用于涉及阻塞代码的工作。对于纯cpu限制的任务,如视频编码,传统的平台/内核线程是最好的。如果您除了处理已经加载到内存中的json文本之外什么也不做,那么如果虚拟线程被证明是cpu绑定的,那么它们可能不会显示出任何好处。但我想试试,因为试运行很容易。如果您正在进行任何i/o(日志记录、访问文件、访问数据库、进行网络调用),那么您肯定会看到虚拟线程的显著性能改进。

相关代码必须是线程安全的

确保json处理库是线程安全的。
确保你的 parseJsonIntoSetOfMovies 方法是线程安全的。

推荐阅读

阅读brian goetz等人的《java并发实践》一书。

相关问题