postgresql 为什么Parallel.ForEach比for循环慢?

yqkkidmi  于 2023-06-22  发布在  PostgreSQL
关注(0)|答案(3)|浏览(130)

我有一个方法,我用Parallel.ForEach来加快处理,首先它是挑选小文件和处理速度快;对于大文件,它非常慢,比for循环慢。我该如何提高速度,或者我应该尝试不同的方法?
请建议或指导我如何提高速度。

public bool CheckQueueExist()
{
    try
    {
        if (QueueVariables.NewFolderPath == "")
        {
            LoadFilePath();
        }
        string path = QueueVariables.NewFolderPath;
        DirectoryInfo info = new DirectoryInfo(path);
        var files = info.GetFiles().OrderBy(p => p.CreationTime)
            .ThenBy(a => a.Name).Take(10000).ToArray();

        var batchOfFiles = files
            .Select((x, i) => new { Index = i, FullName = x.FullName, Name = x.Name })
            .GroupBy(x => x.Index / 10)
            .Select(x => x.Select(v => new { v.Name, v.FullName }).ToList())
            .ToList();

        bool commitstatus = false;

        Parallel.ForEach(Partitioner.Create(
            batchOfFiles, EnumerablePartitionerOptions.NoBuffering),
            new ParallelOptions() { MaxDegreeOfParallelism = 2 }, batch =>
        {
            StringBuilder strQueryBuild = new StringBuilder();
            List<string> filenamesToMove = new List<string>();

            foreach (var file in batch)
            {
                string fullfilepath = file.FullName;
                string Filename = file.Name;
                try
                {
                    string content = System.IO.File.ReadAllText(fullfilepath);
                    strQueryBuild.Append(content);
                    filenamesToMove.Add(fullfilepath);
                }
                catch (Exception ex)
                {
                    commitstatus = false;
                    AppHelper.ErrrorLog(ex, "File in USE: " + fullfilepath);
                }
            }

            int RowsAffected = 0;
            try
            {
                string connstr = System.Configuration.ConfigurationManager
                    .ConnectionStrings["XYZ"].ConnectionString;
                using (var conn = new NpgsqlConnection(connstr))
                {
                    conn.Open();
                    var tra = conn.BeginTransaction();
                    try
                    {
                        NpgsqlCommand cmd = new NpgsqlCommand();
                        cmd.Connection = conn;
                        cmd.CommandType = CommandType.Text;
                        cmd.CommandTimeout = 0;
                        cmd.CommandText = strQueryBuild.ToString();
                        RowsAffected = cmd.ExecuteNonQuery();
                        if (RowsAffected == 0)
                        {
                            RowsAffected = 1;
                        }
                        tra.Commit();
                        commitstatus = true;
                    }
                    catch (Exception ex)
                    {
                        AppHelper.ErrrorLog(ex, "UploadFileData-Error1");
                        RowsAffected = -1;
                        tra.Rollback();
                        commitstatus = false;
                    }
                }
            }
            catch (Exception ex)
            {
                commitstatus = false;
                AppHelper.ErrrorLog(ex, "ProcessQueue-Error2");
            }

            if (commitstatus)
            {
                Parallel.ForEach(filenamesToMove, filepath =>
                {
                    string Filename = Path.GetFileName(filepath);
                    MovetoSuccessFolder(filepath, Filename);
                });
            }
            else
            {
                Parallel.ForEach(filenamesToMove, filepath =>
                {
                    string Filename = Path.GetFileName(filepath);
                    MovetoFailureFolder(filepath, Filename);
                });
            }
        });

        return commitstatus;
    }
    catch (Exception ex)
    {
        AppHelper.ErrrorLog(ex, "CheckQueueExist");
        return false;
    }
    finally
    {

    }
}
pb3skfrl

pb3skfrl1#

让我们看看你到底在做什么
1.从磁盘读取某种类型的数据库查询
1.运行这些查询
1.移动文件
所有这些操作都涉及一定量的IO。特别是当查询涉及任何类型的写入时,因为这将涉及到某种类型的锁,并且所有查询都有相互阻塞的风险。
IO操作大多是串行操作。虽然SSD处理并发请求的能力远远优于HDD,但它们在阅读或写入顺序数据时仍然工作得最好,并且在执行任何类型的随机操作时都会遭受巨大的损失。
在不了解实际性能问题的情况下添加并发可能弊大于利。解决性能问题的正确方法是:
1.测量/概况-没有这个,你可能会花大量的时间来改进那些无关紧要的事情。而这需要在每次“改进”之后才能做到
1.改进算法--算法改进往往具有最大的影响力
1.减少开销-尝试找到重复或不必要的事情,或者有更快的方法来做同样的事情。对于数据库来说尤其如此,因为通常有多种方法来做同一件事,通常具有截然不同的性能配置文件,除非你是数据库Maven,否则很容易犯错误,导致糟糕的性能。
1.考虑并行化-但你需要确保你的程序实际上适合并行化,这意味着你是计算绑定的,你的算法本质上不是顺序的。即使这样,也要考虑是否值得确保线程安全。

agxfikkp

agxfikkp2#

一些意见和建议:

  • 你有嵌套的Parallel.ForEach循环:一个Parallel.ForEach在另一个里面。这不是一个好主意,因为它很难控制并行度(还有其他原因)。Parallel.ForEach循环不应该嵌套。
  • 您有Parallel.ForEach循环,其中包含未配置的MaxDegreeOfParallelism。这不是一个好主意,因为它会导致ThreadPool饱和。微软建议不配置MaxDegreeOfParallelism,这是一个糟糕的建议,IMHO,有很多例外和警告。当Parallel.ForEach用于并行化阻塞I/O时,就像您的情况一样,这是特别有害的。
  • 您正在以10个为一批处理文件。批处理是一种有用的技术,在工作负载是粒度(轻量级)的情况下,可以最小化同步开销。在您的情况下,工作负载阻塞了I/O,这很可能是大块的。与工作负载相比,同步开销很可能可以忽略不计,因此批处理仅用于使并行化不平衡。作为侧节点,您可以使用LINQ运算符Chunk,而不是效率较低的Select/GroupBy/Select/ToList方法。
xe55xuns

xe55xuns3#

Parallel.ForEach可能有几个因素导致变慢。
1.* * 并行化开销**,Parallel.ForEach引入了管理和协调并行执行的开销。当处理小型或轻量级任务时,这种开销可能变得很大,这导致并行版本比顺序版本慢。
1.* * 限制并行度**,代码中MaxDegreeOfParallelism设置为2,限制并发任务数为2。当然,这取决于你的系统的能力,这可能不会充分利用可用的资源,特别是对于大文件。你可以尝试增加并行度,看看它是否能提高性能。
1.* * 共享资源**,看起来您在并行循环中使用了一个共享的StringBuilderstrQueryBuild)和列表(filenamesToMove)。同时访问共享资源可能会引入同步开销并可能影响性能。
为了提高代码的速度,你可以:
1.优化I/O操作,使用System.IO.File.ReadAllText读取文件内容可能是一个瓶颈(特别是对于大文件)。异步I/O操作可以大大提高系统的性能。
1.批量大小优化,您可以尝试不同的批量大小,以找到平衡并行度和资源使用的最佳值。较大的批量大小可以减少并行化开销,而过大的批量可能会导致资源争用。测量不同批量的性能,以确定最有效的值。
1.分析和性能测量,使用分析工具识别代码中的潜在瓶颈。
请记住,并行化并不总是保证提高性能,并且可能会引入复杂性(可读性,控制流......)

相关问题