linq 如何通过在后台预加载流项来优化流的迭代性能?

g6ll5ycj  于 2023-09-28  发布在  其他
关注(0)|答案(2)|浏览(104)

我有一个情况,我需要在UI线程上逐个处理大量文件,每个文件的加载和处理都需要大量的时间,并且它们一次使用太多的内存。这些都是在.NET 4.8应用程序中运行的,不幸的是,在代码库的一部分是同步的。
本质上,我的代码看起来像这样:

IEnumerable<ScanFile> stream = GetScanFileStream();

foreach (ScanFile scanFile in scanFiles)
{
    Process(scanFile); // Requires to be called on the UI thread
}

IEnumerable<ScanFile> GetScanFileStream() =>
    from filePath in Directory.EnumerateFiles("c:\\scans", "*.json")
    select this.LoadScanFile(filePath); // I like to run this in the background
}

执行LoadScanFile所需的时间大致相当于执行Process所需的时间,因此,我希望通过在后台线程上预加载下一个文件,同时操作仍在UI线程上运行,从而将处理这些文件所需的时间缩短一半。
我尝试创建一个特殊的IEnumerable<T>装饰器实现, Package 原始流,这允许这种行为,但我很快发现该实现很快变得过于复杂。我开始使用信号量在线程之间进行同步。从那时起,我停止了追求这个解决方案,认为应该有更简单的解决方案,具有相同的效果。
我期望通过使用内置于BCL和CLR的结构,使用LINQ to parallel可以实现这种行为,但是广泛的Google搜索没有产生任何好的结果。
您能建议什么解决方案使我的处理时间缩短一半?

yrdbyhpb

yrdbyhpb1#

您可以使用IAsyncEnumerable,并使LoadScanFile异步(使用异步函数或使用Task.Run)。然后在产生前一个任务之前调用下一个任务。

IAsyncEnumerable<ScanFile> GetScanFileStream()
{
    ScanFile scanFile = null;
    foreach (var filePath in Directory.EnumerateFiles("c:\\scans", "*.json"))
    {
        // start the next task
        var scanFileTask = this.LoadScanFileAync(filePath);
        // if we have one already yield it
        if (scanFile != null)
            yield return scanFile;

        scanFile = await scanFileTask;
    }
    if (scanFile != null)    // and yield the last one also
        yield return scanFile;
}

现在你可以做

await foreach (ScanFile scanFile in scanFiles)
{
    Process(scanFile); // Requires to be called on the UI thread
}
6mw9ycah

6mw9ycah2#

如你所愿选项1 Parallel.ForEach..

void Main()
{
    ConcurrentBag<ScanFile> resultCollection = new ConcurrentBag<ScanFile>();
    
    var options = new ParallelOptions();
    options.MaxDegreeOfParallelism = Math.Max(Environment.ProcessorCount / 2, 1)*10;
    
    ParallelLoopResult result =
        Parallel.ForEach(
            Directory.EnumerateFiles("c:\\1", "*.json").ToList(),
            options, filePath =>
            {
                resultCollection.Add(this.LoadScanFileAync(filePath));
            });
    
    result.Dump();
    resultCollection.Dump();
}

选项2:选中Task.WhenAll(),以便进程将等待,直到所有任务都完成。

ConcurrentBag<ScanFile> resultCollection {get;set;}

void Main()
{
    resultCollection = new ConcurrentBag<ScanFile>();
    DoAllAsync().Dump();
    resultCollection.Dump();
}

public Task DoAllAsync() =>
    Task.WhenAll(Directory.EnumerateFiles("c:\\1", "*.json")
        .Select(filePath => Task.Run(() => 
            resultCollection.Add(this.LoadScanFileAync(filePath)))));

作为选项3,您可以使用.AsParallel()

resultCollection = new ConcurrentBag<ScanFile>();

Directory.EnumerateFiles("c:\\1", "*.json").ToList()
    .AsParallel()
    .ForAll(filePath => resultCollection.Add(this.LoadScanFileAync(filePath)));

resultCollection.Dump();

P.S.你可以简单地通过添加一个

Thread.Sleep(TimeSpan.FromSeconds((5)));

到LoadScanFileAync()函数

相关问题