一种并发可控的并行工作队列的简单.NET方法

pbpqsu0x  于 2023-06-25  发布在  .NET
关注(0)|答案(2)|浏览(116)

这听起来像是一个过于琐碎的问题,我想我把它弄得过于复杂了,因为我已经好几个月没有找到答案了。在Golang、Scala/Akka等中有很多简单的方法可以做到这一点,但我似乎在. NET中找不到任何东西。
我需要的是一个拥有一个任务列表的能力,这些任务都是相互独立的,并且能够在指定的(并且容易改变的)线程数量上同时执行它们。
基本上是这样的:

int numberOfParallelThreads = 3;                // changeable
Queue<Task> pendingTasks = GetPendingTasks();   // returns 80 items

await SomeBuiltInDotNetParallelExecutableManager.RunAllTasksWithSpecifiedConcurrency(pendingTasks, numberOfParallelThreads);

SomeBuiltInDotNetParallelExecutableManager一次执行三个任务,共80个任务;即,当一个完成时,它从队列中抽取下一个,直到队列耗尽。
Task.WhenAllTask.WaitAll,但您不能指定其中的最大并行线程数。
有没有一个简单的内置方法来做到这一点?

nx7onnlm

nx7onnlm1#

Parallel.ForEachAsync(或者根据实际工作负载,它是同步对应物-Parallel.ForEach,但它不会正确处理返回Task的函数):

IEnumerable<int> x = ...;

await Parallel.ForEachAsync(x, new ParallelOptions
{
    MaxDegreeOfParallelism = 3
}, async (i, token) => await Task.Delay(i * 1000, token));

此外,强烈建议C#中的方法返回所谓的“hot”,即。启动的任务,所以“习惯性地”Queue<Task>应该是已经启动的任务的集合,所以你不能控制并行执行的任务的数量,因为它将由ThreadPool/TaskScheduler控制。
如果你想沿着这条路走,可以从Akka到.NET -Akka.NET的端口。

xeufq47z

xeufq47z2#

微软的Reactive Framework也让这变得简单:

IEnumerable<int> values = ...;

IDisposable subscription =
    values
        .ToObservable()
        .Select(v => Observable.Defer(() => Observable.Start(() => { /* do work on each value */ })))
        .Merge(3)
        .Subscribe();

相关问题