如何使非阻塞的ReadLineAsync()在.NET中并发运行?

eit6fx6z  于 2023-03-31  发布在  .NET
关注(0)|答案(1)|浏览(140)

如何使用非阻塞IO并发读取文件行?
直接使用async await(BiggestLineSequential中的)会导致顺序执行:

static async Task<string> BiggestLineSequential(String filename) {
    string longestLine = "";
    using (StreamReader reader = new StreamReader(filename)) {
            while (!reader.EndOfStream)
            {
                var line = await reader.ReadLineAsync();
                if (line != null && line.Length > longestLine.Length)
                {
                    longestLine = line;
                }
            }
        }
        return longestLine;
}

在这种情况下,只有当ReadLineAsync()返回的Task完成时,对await reader.ReadLineAsync();的调用才会继续。因此,只有当前一行已经从文件中读取时,才会读取下一行。
为了实现并发行读取,我尝试了以下方法:

static string BiggestLineConcurrent(String filename) {
      List<Task<string>> taskList = new List<Task<string>>();
      using (StreamReader reader = new StreamReader(filename))
      {
            while (!reader.EndOfStream)
            {
                  taskList.Add(reader.ReadLineAsync());
            }
      }

      string longestLine = "";
      foreach(Task<string> tsk in taskList)
      {
            string line = tsk.Result;
            if (line != null && line.Length > longestLine.Length)
            {
                  longestLine = line;
            }
      }

      return longestLine;
}

在这种情况下,所有的非阻塞IO操作都应该并发运行,而不是像第一个例子中所示的顺序运行。所有的任务都被启动并存储在一个列表中,然后当结果可用时,处理继续进行。
但是,BiggestLineConcurrentwhile (!reader.EndOfStream)行中抛出一个InvalidOperationException,并显示消息“* 流当前正在被流上的前一个操作使用。*”
在.NET中,有没有什么方法可以使用非阻塞IO并发读取行,而不显式地使用新线程或辅助线程池机制?

xlpyo6sf

xlpyo6sf1#

你不能在同一时间从一个流中多次读取,我也不明白这样做有什么意义。流只有一组关于其当前位置等的属性,所以你实际上会有并发冲突。
相反,您可以使用多个流从多个位置开始阅读。

static string BiggestLineConcurrent(String filename)
{
    var maxDOP = Environment.ProcessorCount;
    var fileLength = new FileInfo(filename).Length
    var tasks = Partitioner.Create(0, fileLength - 1)
        .GetPartitions(fileLength / maxDOP + 1)
        .Select(t => ReadData(filename, t.Item1, t.Item2));
    var longests = await Task.WhenAll(tasks);
    return longests.MaxBy(l => l.Length);

    
    async Task<string> ReadData(string filename2, long from, long to)
    {
        var longest = "";
        using var stream = new FileStream(filename2, FileMode.Open, FileAccess.Read, FileShare.Read);
        stream.Position = from;
        using var reader = new StreamReader(stream);
        string line;
        try
        {
             while ((line = await reader.ReadLineAsync()) != null)
             {
                 if (longest.Length < line.Length)
                    longest = line;
                if (stream.Position > to)
                    stream.Close();
            }
         }
        catch (ObjectDisposedException)
        { \\
        }
        return line;
     }
}

当流到达分区的终点时,您需要关闭它,然后因为StreamReader可以抛出,因此您需要吞下该异常。
您的另一种选择是使用某种PartialStream

相关问题